capistrant commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r529982981
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -377,6 +377,11 @@ protected boolean
useExclusiveStartSequenceNumberForNonFirstSequence()
return true;
}
+ @Override
+ protected void collectLag(ArrayList<Long> lags)
+ {
Review comment:
add comment stating why this is not implemented
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
##########
@@ -85,7 +88,9 @@ public KinesisSupervisorIOConfig(
completionTimeout,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
+ dynamicAllocationTasksProperties,
lateMessageRejectionStartDateTime
+
Review comment:
nit: remove empty line
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -518,20 +684,52 @@ public SeekableStreamSupervisor(
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
+ this.dynamicAllocationTasksProperties =
ioConfig.getDynamicAllocationTasksProperties();
+ log.info("Get dynamicAllocationTasksProperties from IOConfig : " +
dynamicAllocationTasksProperties);
+
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
+ log.info("EnableDynamicAllocationTasks for datasource " + dataSource);
+ this.enableDynamicAllocationTasks = true;
+ } else {
+ log.info("Disable Dynamic Allocate Tasks");
+ this.enableDynamicAllocationTasks = false;
+ }
+ int taskCountMax = 0;
+ if (enableDynamicAllocationTasks) {
+ this.metricsCollectionIntervalMillis =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionIntervalMillis",
10000)));
+ this.metricsCollectionRangeMillis =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionRangeMillis",
6 * 10 * 1000)));
+ int slots = (int) (metricsCollectionRangeMillis /
metricsCollectionIntervalMillis) + 1;
+ log.info(" The interval of metrics collection is " +
metricsCollectionIntervalMillis + ", " + metricsCollectionRangeMillis + "
timeRange will collect " + slots + " data points at most.");
+ this.queue = new CircularFifoQueue<>(slots);
+ taskCountMax =
Integer.parseInt(String.valueOf(this.dynamicAllocationTasksProperties.getOrDefault("taskCountMax",
8)));
Review comment:
what is the reasoning behind this default of 8?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
##########
@@ -46,6 +48,7 @@
private final Optional<Duration> lateMessageRejectionPeriod;
private final Optional<Duration> earlyMessageRejectionPeriod;
private final Optional<DateTime> lateMessageRejectionStartDateTime;
+ private final Map<String, Object> dynamicAllocationTasksProperties;
Review comment:
as your comment says below, this could be null. Should we annotate as
nullable?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
+ {
+ lock.lock();
+ try {
+ long nowTime = System.currentTimeMillis();
+ long minTriggerDynamicFrequency =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
1200000)));
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.info("PendingCompletionTaskGroups is : " +
pendingCompletionTaskGroups);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[" + pendingCompletionTaskGroups + "]");
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime -
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" +
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+ return;
+ }
+ if (!queue.isAtFullCapacity()) {
+ log.info("Metrics collection is not at full capacity, skip to check
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+ return;
+ }
+ List<Long> lags = collectTotalLags();
+ boolean allocationSuccess = dynamicAllocate(lags);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ queue.clear();
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private boolean dynamicAllocate(List<Long> lags) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ // if supervisor is not suspended, ensure required tasks are running
+ // if suspended, ensure tasks have been requested to gracefully stop
+ log.info("[%s] supervisor is running, start to check dynamic allocate task
logic", dataSource);
+ long scaleOutThreshold =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutThreshold",
5000000)));
+ long scaleInThreshold =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInThreshold",
1000000)));
+ double triggerSaleOutThresholdFrequency =
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleOutThresholdFrequency",
0.3)));
+ double triggerSaleInThresholdFrequency =
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleInThresholdFrequency",
0.8)));
+ int taskCountMax =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMax",
8)));
+ int taskCountMin =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMin",
1)));
+ int scaleInStep =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInStep",
1)));
+ int scaleOutStep =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutStep",
2)));
+ int beyond = 0;
+ int within = 0;
+ int metricsCount = lags.size();
+ for (Long lag : lags) {
+ if (lag >= scaleOutThreshold) {
+ beyond++;
+ }
+ if (lag <= scaleInThreshold) {
+ within++;
+ }
+ }
+ double beyondProportion = beyond * 1.0 / metricsCount;
+ double withinProportion = within * 1.0 / metricsCount;
+ log.info("triggerSaleOutThresholdFrequency is [ " +
triggerSaleOutThresholdFrequency + " ] and triggerSaleInThresholdFrequency is [
" + triggerSaleInThresholdFrequency + " ]");
+ log.info("beyondProportion is [ " + beyondProportion + " ] and
withinProportion is [ " + withinProportion + " ]");
+
+ int currentActiveTaskCount;
+ int desireActiveTaskCount;
+ Collection<TaskGroup> activeTaskGroups =
activelyReadingTaskGroups.values();
+ currentActiveTaskCount = activeTaskGroups.size();
+
+ if (beyondProportion >= triggerSaleOutThresholdFrequency) {
+ // Do Scale out
+ int taskCount = currentActiveTaskCount + scaleOutStep;
+ if (currentActiveTaskCount == taskCountMax) {
+ log.info("CurrentActiveTaskCount reach task count Max limit, skip to
scale out tasks");
+ return false;
+ } else {
+ desireActiveTaskCount = Math.min(taskCount, taskCountMax);
+ }
+ log.info("Start to scale out tasks , current active task number [ " +
currentActiveTaskCount + " ] and desire task number is [ " +
desireActiveTaskCount + " ] ");
+ gracefulShutdownInternal();
+ // clear everything
+ clearAllocationInfos();
+ log.info("Set Task Count : " + desireActiveTaskCount);
+ setTaskCount(desireActiveTaskCount);
+ return true;
+ }
+
+ if (withinProportion >= triggerSaleInThresholdFrequency) {
+ // Do Scale in
+ int taskCount = currentActiveTaskCount - scaleInStep;
+ if (currentActiveTaskCount == taskCountMin) {
+ log.info("CurrentActiveTaskCount reach task count Min limit, skip to
scale in tasks");
+ return false;
+ } else {
+ desireActiveTaskCount = Math.max(taskCount, taskCountMin);
+ }
+ log.info("Start to scale in tasks , current active task number [ " +
currentActiveTaskCount + " ] and desire task number is [ " +
desireActiveTaskCount + " ] ");
+ gracefulShutdownInternal();
+ // clear everything
+ clearAllocationInfos();
+ log.info("Set Task Count : " + desireActiveTaskCount);
+ setTaskCount(desireActiveTaskCount);
+ return true;
+ }
+ return false;
+ }
+
+ private void setTaskCount(int desireActiveTaskCount)
Review comment:
I think this method deserves a more specific name as it is actually
re-submitting the supervisor. Perhaps `submitSupervisorWithTaskCount` or
something of that sort?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -518,20 +684,52 @@ public SeekableStreamSupervisor(
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
+ this.dynamicAllocationTasksProperties =
ioConfig.getDynamicAllocationTasksProperties();
+ log.info("Get dynamicAllocationTasksProperties from IOConfig : " +
dynamicAllocationTasksProperties);
+
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
+ log.info("EnableDynamicAllocationTasks for datasource " + dataSource);
+ this.enableDynamicAllocationTasks = true;
+ } else {
+ log.info("Disable Dynamic Allocate Tasks");
+ this.enableDynamicAllocationTasks = false;
+ }
+ int taskCountMax = 0;
+ if (enableDynamicAllocationTasks) {
+ this.metricsCollectionIntervalMillis =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionIntervalMillis",
10000)));
Review comment:
wondering if it would be better to have all these defaults be final
constants instantiated at top of class for easy reference?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
+ {
+ lock.lock();
+ try {
+ long nowTime = System.currentTimeMillis();
+ long minTriggerDynamicFrequency =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
1200000)));
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.info("PendingCompletionTaskGroups is : " +
pendingCompletionTaskGroups);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[" + pendingCompletionTaskGroups + "]");
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime -
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" +
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+ return;
+ }
+ if (!queue.isAtFullCapacity()) {
+ log.info("Metrics collection is not at full capacity, skip to check
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+ return;
+ }
+ List<Long> lags = collectTotalLags();
+ boolean allocationSuccess = dynamicAllocate(lags);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ queue.clear();
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private boolean dynamicAllocate(List<Long> lags) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ // if supervisor is not suspended, ensure required tasks are running
+ // if suspended, ensure tasks have been requested to gracefully stop
+ log.info("[%s] supervisor is running, start to check dynamic allocate task
logic", dataSource);
+ long scaleOutThreshold =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutThreshold",
5000000)));
+ long scaleInThreshold =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInThreshold",
1000000)));
+ double triggerSaleOutThresholdFrequency =
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleOutThresholdFrequency",
0.3)));
+ double triggerSaleInThresholdFrequency =
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleInThresholdFrequency",
0.8)));
+ int taskCountMax =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMax",
8)));
+ int taskCountMin =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMin",
1)));
+ int scaleInStep =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInStep",
1)));
+ int scaleOutStep =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutStep",
2)));
+ int beyond = 0;
+ int within = 0;
+ int metricsCount = lags.size();
+ for (Long lag : lags) {
+ if (lag >= scaleOutThreshold) {
+ beyond++;
+ }
+ if (lag <= scaleInThreshold) {
+ within++;
+ }
+ }
+ double beyondProportion = beyond * 1.0 / metricsCount;
+ double withinProportion = within * 1.0 / metricsCount;
+ log.info("triggerSaleOutThresholdFrequency is [ " +
triggerSaleOutThresholdFrequency + " ] and triggerSaleInThresholdFrequency is [
" + triggerSaleInThresholdFrequency + " ]");
+ log.info("beyondProportion is [ " + beyondProportion + " ] and
withinProportion is [ " + withinProportion + " ]");
+
+ int currentActiveTaskCount;
+ int desireActiveTaskCount;
+ Collection<TaskGroup> activeTaskGroups =
activelyReadingTaskGroups.values();
+ currentActiveTaskCount = activeTaskGroups.size();
+
+ if (beyondProportion >= triggerSaleOutThresholdFrequency) {
+ // Do Scale out
+ int taskCount = currentActiveTaskCount + scaleOutStep;
+ if (currentActiveTaskCount == taskCountMax) {
+ log.info("CurrentActiveTaskCount reach task count Max limit, skip to
scale out tasks");
+ return false;
+ } else {
+ desireActiveTaskCount = Math.min(taskCount, taskCountMax);
+ }
+ log.info("Start to scale out tasks , current active task number [ " +
currentActiveTaskCount + " ] and desire task number is [ " +
desireActiveTaskCount + " ] ");
+ gracefulShutdownInternal();
+ // clear everything
+ clearAllocationInfos();
+ log.info("Set Task Count : " + desireActiveTaskCount);
+ setTaskCount(desireActiveTaskCount);
+ return true;
+ }
+
+ if (withinProportion >= triggerSaleInThresholdFrequency) {
+ // Do Scale in
+ int taskCount = currentActiveTaskCount - scaleInStep;
+ if (currentActiveTaskCount == taskCountMin) {
+ log.info("CurrentActiveTaskCount reach task count Min limit, skip to
scale in tasks");
+ return false;
+ } else {
+ desireActiveTaskCount = Math.max(taskCount, taskCountMin);
+ }
+ log.info("Start to scale in tasks , current active task number [ " +
currentActiveTaskCount + " ] and desire task number is [ " +
desireActiveTaskCount + " ] ");
+ gracefulShutdownInternal();
+ // clear everything
+ clearAllocationInfos();
+ log.info("Set Task Count : " + desireActiveTaskCount);
+ setTaskCount(desireActiveTaskCount);
+ return true;
+ }
+ return false;
+ }
+
+ private void setTaskCount(int desireActiveTaskCount)
Review comment:
what are the consequences of failure at this point? we have called
gracefulShutdownInternal so I assume we will be left with no active supervisor
for the datasource?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -791,6 +1016,38 @@ public void tryInit()
}
}
+ private Runnable collectAndcollectLags()
+ {
+ return new Runnable() {
+ @Override
+ public void run()
Review comment:
logs should provide context about what supervisor they are referring. As
in other places, lets assess what can be changed to debug to reduce chattiness
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -3561,4 +3843,6 @@ protected void emitLag()
* sequences. In Kafka, start offsets are always inclusive.
*/
protected abstract boolean
useExclusiveStartSequenceNumberForNonFirstSequence();
+
+ protected abstract void collectLag(ArrayList<Long> lags);
Review comment:
javadoc please
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -652,6 +857,11 @@ public void stop(boolean stopGracefully)
try {
scheduledExec.shutdownNow(); // stop recurring executions
reportingExec.shutdownNow();
+ log.info("Shut Down allocationExec now");
Review comment:
I don't think this log or the one below is needed since there aren't
logs for the other Execs
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -3526,6 +3789,25 @@ protected void emitLag()
}
}
+
+ protected void computeLags(Map<PartitionIdType, Long> partitionLags,
ArrayList<Long> lags)
Review comment:
pretty straightforward method, but a short javadoc would be nice since
we are updating an important lag related object
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -791,6 +1016,38 @@ public void tryInit()
}
}
+ private Runnable collectAndcollectLags()
Review comment:
is this supposed to be `collectAndComputeLags()`? As far as I can tell,
the log on line 982 seems to suggest that is the name you may have meant to use
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
Review comment:
javadoc would be helpful as this is complex/important method override
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
+ {
+ lock.lock();
+ try {
+ long nowTime = System.currentTimeMillis();
+ long minTriggerDynamicFrequency =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
1200000)));
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.info("PendingCompletionTaskGroups is : " +
pendingCompletionTaskGroups);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[" + pendingCompletionTaskGroups + "]");
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime -
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" +
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+ return;
+ }
+ if (!queue.isAtFullCapacity()) {
+ log.info("Metrics collection is not at full capacity, skip to check
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+ return;
+ }
+ List<Long> lags = collectTotalLags();
+ boolean allocationSuccess = dynamicAllocate(lags);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ queue.clear();
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private boolean dynamicAllocate(List<Long> lags) throws
InterruptedException, ExecutionException, TimeoutException
Review comment:
javadoc would be helpful as this is important/complex method
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
##########
@@ -113,12 +119,23 @@ public Integer getReplicas()
return replicas;
}
+ @JsonProperty
Review comment:
should this be annotated as nullable if the instance can be null as your
comment in the constructor suggests?
##########
File path:
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
##########
@@ -824,12 +827,32 @@ private static SeekableStreamSupervisorIOConfig
getIOConfig()
false,
new Period("PT30M"),
null,
- null, null
+ null, getProperties(), null
)
{
};
}
+ private static Map<String, Object> getProperties()
+ {
+ HashMap<String, Object> dynamicAllocationTasksProperties = new HashMap<>();
Review comment:
we need to document all of these new configs in kafka-ingestion.md in
the `KafkaSupervisorIOConfig` section
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -495,6 +655,12 @@ boolean isValidTaskGroup(int taskGroupId, @Nullable
TaskGroup taskGroup)
private volatile boolean stopped = false;
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;
+ private final boolean enableDynamicAllocationTasks;
+ private volatile long metricsCollectionIntervalMillis;
+ private volatile long metricsCollectionRangeMillis;
+ private volatile long dynamicCheckStartDelayMillis;
+ private volatile long dynamicCheckPeriod;
+ private volatile CircularFifoQueue<Long> queue;
Review comment:
I think refactoring with a more descriptive name would be beneficial for
readability
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -518,20 +684,52 @@ public SeekableStreamSupervisor(
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
+ this.dynamicAllocationTasksProperties =
ioConfig.getDynamicAllocationTasksProperties();
+ log.info("Get dynamicAllocationTasksProperties from IOConfig : " +
dynamicAllocationTasksProperties);
+
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
+ log.info("EnableDynamicAllocationTasks for datasource " + dataSource);
+ this.enableDynamicAllocationTasks = true;
+ } else {
+ log.info("Disable Dynamic Allocate Tasks");
+ this.enableDynamicAllocationTasks = false;
+ }
+ int taskCountMax = 0;
+ if (enableDynamicAllocationTasks) {
+ this.metricsCollectionIntervalMillis =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionIntervalMillis",
10000)));
+ this.metricsCollectionRangeMillis =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionRangeMillis",
6 * 10 * 1000)));
+ int slots = (int) (metricsCollectionRangeMillis /
metricsCollectionIntervalMillis) + 1;
+ log.info(" The interval of metrics collection is " +
metricsCollectionIntervalMillis + ", " + metricsCollectionRangeMillis + "
timeRange will collect " + slots + " data points at most.");
+ this.queue = new CircularFifoQueue<>(slots);
+ taskCountMax =
Integer.parseInt(String.valueOf(this.dynamicAllocationTasksProperties.getOrDefault("taskCountMax",
8)));
+ this.dynamicCheckStartDelayMillis =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("dynamicCheckStartDelayMillis",
300000)));
+ this.dynamicCheckPeriod =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("dynamicCheckPeriod",
600000)));
+ this.metricsCollectionRangeMillis =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionRangeMillis",
600000)));
+ }
+
this.tuningConfig = spec.getTuningConfig();
this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
this.supervisorId = supervisorId;
this.exec = Execs.singleThreaded(supervisorId);
this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId +
"-Scheduler-%d");
this.reportingExec = Execs.scheduledSingleThreaded(supervisorId +
"-Reporting-%d");
+ this.allocationExec = Execs.scheduledSingleThreaded(supervisorId +
"-Allocation-%d");
+ this.lagComputationExec = Execs.scheduledSingleThreaded(supervisorId +
"-Computation-%d");
this.stateManager = new SeekableStreamSupervisorStateManager(
spec.getSupervisorStateManagerConfig(),
spec.isSuspended()
);
- int workerThreads = (this.tuningConfig.getWorkerThreads() != null
- ? this.tuningConfig.getWorkerThreads()
- : Math.min(10, this.ioConfig.getTaskCount()));
+ int workerThreads;
+ if (enableDynamicAllocationTasks) {
+ workerThreads = (this.tuningConfig.getWorkerThreads() != null
+ ? this.tuningConfig.getWorkerThreads()
+ : Math.min(10, taskCountMax));
+ } else {
+ workerThreads = (this.tuningConfig.getWorkerThreads() != null
+ ? this.tuningConfig.getWorkerThreads()
+ : Math.min(10, this.ioConfig.getTaskCount()));
+ }
this.workerExec =
MoreExecutors.listeningDecorator(Execs.multiThreaded(workerThreads,
supervisorId + "-Worker-%d"));
log.info("Created worker pool with [%d] threads for dataSource [%s]",
workerThreads, this.dataSource);
Review comment:
same thought about debug level and context about the supervisor it is
referring to
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -1137,6 +1394,20 @@ public void gracefulShutdownInternal() throws
ExecutionException, InterruptedExc
@VisibleForTesting
public void resetInternal(DataSourceMetadata dataSourceMetadata)
{
+ // clear queue for kafka lags
+ if (enableDynamicAllocationTasks && queue != null) {
+ try {
+ lock.lock();
+ queue.clear();
+ }
+ catch (Exception e) {
+ log.warn(e, "Error,when clear queue in rest action");
Review comment:
what are the implications of this failing? we are catching and carrying
on. Can anything negative come from that?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
+ {
+ lock.lock();
+ try {
+ long nowTime = System.currentTimeMillis();
+ long minTriggerDynamicFrequency =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
1200000)));
Review comment:
wondering if this default value should be final constant instantiated at
top of class?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -791,6 +1016,38 @@ public void tryInit()
}
}
+ private Runnable collectAndcollectLags()
+ {
+ return new Runnable() {
+ @Override
+ public void run()
+ {
+ lock.lock();
+ try {
+ if (!spec.isSuspended()) {
+ ArrayList<Long> metricsInfo = new ArrayList<>(3);
+ collectLag(metricsInfo);
+ long totalLags = metricsInfo.size() < 3 ? 0 : metricsInfo.get(1);
+ queue.offer(totalLags > 0 ? totalLags : 0);
+ log.info("Current lag metric points : " + new ArrayList<>(queue));
+ } else {
+ log.info("[%s] supervisor is suspended, skip to collect kafka
lags", dataSource);
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error, When collect kafka lags");
Review comment:
should this be warn if we catch and move on?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -768,7 +978,22 @@ public void tryInit()
);
scheduleReporting(reportingExec);
-
+ if (enableDynamicAllocationTasks) {
+ log.info("Collect and compute lags at fixed rate of " +
metricsCollectionIntervalMillis);
Review comment:
include reference to the datasource in this log and the one for the lag
computation executor below. Should they be debug to reduce info level
chattiness?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
+ {
+ lock.lock();
+ try {
+ long nowTime = System.currentTimeMillis();
+ long minTriggerDynamicFrequency =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
1200000)));
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.info("PendingCompletionTaskGroups is : " +
pendingCompletionTaskGroups);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[" + pendingCompletionTaskGroups + "]");
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime -
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" +
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+ return;
+ }
+ if (!queue.isAtFullCapacity()) {
+ log.info("Metrics collection is not at full capacity, skip to check
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+ return;
+ }
+ List<Long> lags = collectTotalLags();
+ boolean allocationSuccess = dynamicAllocate(lags);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ queue.clear();
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private boolean dynamicAllocate(List<Long> lags) throws
InterruptedException, ExecutionException, TimeoutException
Review comment:
logs added in this method should provide context about what supervisor
they refer to. I also think we should evaluate what logs should be changed to
debug too so limit the chattiness of info level
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
+ {
+ lock.lock();
+ try {
+ long nowTime = System.currentTimeMillis();
+ long minTriggerDynamicFrequency =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
1200000)));
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.info("PendingCompletionTaskGroups is : " +
pendingCompletionTaskGroups);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[" + pendingCompletionTaskGroups + "]");
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime -
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" +
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+ return;
+ }
+ if (!queue.isAtFullCapacity()) {
Review comment:
does this mean we have not collected enough historical lag data to
decide on scale in/scale out? I think the log can be updated to be more
descriptive since it may not be obvious to log reader why it matters that queue
is not full
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -495,6 +655,12 @@ boolean isValidTaskGroup(int taskGroupId, @Nullable
TaskGroup taskGroup)
private volatile boolean stopped = false;
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;
+ private final boolean enableDynamicAllocationTasks;
+ private volatile long metricsCollectionIntervalMillis;
+ private volatile long metricsCollectionRangeMillis;
+ private volatile long dynamicCheckStartDelayMillis;
+ private volatile long dynamicCheckPeriod;
+ private volatile CircularFifoQueue<Long> queue;
public SeekableStreamSupervisor(
Review comment:
logs in this constructor should include info on the supervisor being
referred to. I think we should also evaluate what can be debug to reduce
chattiness in info level logging.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
+ {
+ lock.lock();
+ try {
+ long nowTime = System.currentTimeMillis();
+ long minTriggerDynamicFrequency =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
1200000)));
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.info("PendingCompletionTaskGroups is : " +
pendingCompletionTaskGroups);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[" + pendingCompletionTaskGroups + "]");
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime -
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" +
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+ return;
+ }
+ if (!queue.isAtFullCapacity()) {
+ log.info("Metrics collection is not at full capacity, skip to check
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+ return;
+ }
+ List<Long> lags = collectTotalLags();
+ boolean allocationSuccess = dynamicAllocate(lags);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ queue.clear();
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private boolean dynamicAllocate(List<Long> lags) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ // if supervisor is not suspended, ensure required tasks are running
+ // if suspended, ensure tasks have been requested to gracefully stop
+ log.info("[%s] supervisor is running, start to check dynamic allocate task
logic", dataSource);
+ long scaleOutThreshold =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutThreshold",
5000000)));
+ long scaleInThreshold =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInThreshold",
1000000)));
+ double triggerSaleOutThresholdFrequency =
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleOutThresholdFrequency",
0.3)));
+ double triggerSaleInThresholdFrequency =
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleInThresholdFrequency",
0.8)));
+ int taskCountMax =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMax",
8)));
+ int taskCountMin =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMin",
1)));
+ int scaleInStep =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInStep",
1)));
+ int scaleOutStep =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutStep",
2)));
+ int beyond = 0;
+ int within = 0;
+ int metricsCount = lags.size();
+ for (Long lag : lags) {
+ if (lag >= scaleOutThreshold) {
+ beyond++;
+ }
+ if (lag <= scaleInThreshold) {
+ within++;
+ }
+ }
+ double beyondProportion = beyond * 1.0 / metricsCount;
+ double withinProportion = within * 1.0 / metricsCount;
+ log.info("triggerSaleOutThresholdFrequency is [ " +
triggerSaleOutThresholdFrequency + " ] and triggerSaleInThresholdFrequency is [
" + triggerSaleInThresholdFrequency + " ]");
Review comment:
same spelling callout as above
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
+ {
+ lock.lock();
+ try {
+ long nowTime = System.currentTimeMillis();
+ long minTriggerDynamicFrequency =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
1200000)));
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.info("PendingCompletionTaskGroups is : " +
pendingCompletionTaskGroups);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[" + pendingCompletionTaskGroups + "]");
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime -
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" +
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+ return;
+ }
+ if (!queue.isAtFullCapacity()) {
+ log.info("Metrics collection is not at full capacity, skip to check
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+ return;
+ }
+ List<Long> lags = collectTotalLags();
+ boolean allocationSuccess = dynamicAllocate(lags);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ queue.clear();
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private boolean dynamicAllocate(List<Long> lags) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ // if supervisor is not suspended, ensure required tasks are running
+ // if suspended, ensure tasks have been requested to gracefully stop
+ log.info("[%s] supervisor is running, start to check dynamic allocate task
logic", dataSource);
+ long scaleOutThreshold =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutThreshold",
5000000)));
+ long scaleInThreshold =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInThreshold",
1000000)));
+ double triggerSaleOutThresholdFrequency =
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleOutThresholdFrequency",
0.3)));
Review comment:
I think these may be spelling mistakes in variable name and config value
for this and next config. `triggerSale*` --> `triggerScale*` ?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
Review comment:
also the logs added should add context about what supervisor is being
logged. I think we should evaluate what logs should be changed to debug too so
limit the chattiness of info level
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -791,6 +1016,38 @@ public void tryInit()
}
}
+ private Runnable collectAndcollectLags()
Review comment:
also, a javadoc would be helpful too if you don't mind
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
+ {
+ lock.lock();
+ try {
+ long nowTime = System.currentTimeMillis();
+ long minTriggerDynamicFrequency =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
1200000)));
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.info("PendingCompletionTaskGroups is : " +
pendingCompletionTaskGroups);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[" + pendingCompletionTaskGroups + "]");
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime -
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" +
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+ return;
+ }
+ if (!queue.isAtFullCapacity()) {
+ log.info("Metrics collection is not at full capacity, skip to check
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+ return;
+ }
+ List<Long> lags = collectTotalLags();
+ boolean allocationSuccess = dynamicAllocate(lags);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ queue.clear();
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private boolean dynamicAllocate(List<Long> lags) throws
InterruptedException, ExecutionException, TimeoutException
Review comment:
should the config defaults be instantiated as final constants at top of
class?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]