kfaraz commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991179466
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -154,7 +155,7 @@ public synchronized void maybeSetState(State proposedState)
return;
}
- // if we're trying to switch to a healthy steady state (i.e. RUNNING or
SUSPENDED) but haven't had a successful run
+ // if we're trying to switch to a healthy steady state (i.e. RUNNING or
SUSPENDED) or IDLE state but haven't had a successful run
Review Comment:
I don't think this change is applicable since IDLE doesn't count as a
"healthy steady state". IIUC, the `healthySteadyState` can only be RUNNING or
SUSPENDED, which is determined while creating this `SupervisorStateManager`
instance.
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java:
##########
@@ -46,6 +46,10 @@
@JsonProperty
private int maxStoredExceptionEvents = Math.max(unhealthinessThreshold,
healthinessThreshold);
+ //
Review Comment:
Nit: missing comment?
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
Review Comment:
Adding a dependency just for `ImmutableMap` seems overkill. You can use
`Collections.emptyMap()` instead.
##########
.travis.yml:
##########
@@ -506,13 +506,13 @@ jobs:
stage: Tests - phase 2
jdk: openjdk8
services: *integration_test_services
- env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=8'
USE_INDEXER='middleManager'
+ env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=8'
USE_INDEXER='middleManager'
OVERRIDE_CONFIG_PATH='./environment-configs/test-groups/enable-idle-behaviour'
Review Comment:
Nit: It is preferable to have the name of the override config the same as
the test group (`kafka-index` in this case). Otherwise, we would end up having
separate config files for every feature.
You need not create another one for `kafka-transactional-index` though and
can reuse the same one.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2511,21 @@ private boolean updatePartitionDataFromStream()
return true;
}
+ protected Map<PartitionIdType, SequenceOffsetType> getLatestSequences()
Review Comment:
Please add a javadoc to this method.
Should this be renamed to `getLatestSequencesFromStream()`?
##########
integration-tests/src/test/resources/stream/data/supervisor_with_idle_behaviour_enabled_spec_template.json:
##########
@@ -0,0 +1,60 @@
+{
Review Comment:
This spec seems pretty generic. Do we really need to add a new one for the
new test or can we reuse any existing one?
##########
extensions-core/kafka-indexing-service/pom.xml:
##########
@@ -134,6 +134,11 @@
<artifactId>validation-api</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
Review Comment:
It doesn't seem like this dependency is needed.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2511,21 @@ private boolean updatePartitionDataFromStream()
return true;
}
+ protected Map<PartitionIdType, SequenceOffsetType> getLatestSequences()
+ {
+ return new HashMap<>();
+ }
+
+ private boolean isIdle()
Review Comment:
This method should be exposed by the `stateManager` itself, since the state
is tracked there anyway and we refer to it for other purposes such as checking
`isSteadyState()` or `isHealthy()`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2511,21 @@ private boolean updatePartitionDataFromStream()
return true;
}
+ protected Map<PartitionIdType, SequenceOffsetType> getLatestSequences()
+ {
+ return new HashMap<>();
+ }
+
+ private boolean isIdle()
+ {
+ return SupervisorStateManager.BasicState.IDLE.equals(getState());
+ }
+
private void assignRecordSupplierToPartitionIds()
{
recordSupplierLock.lock();
try {
- final Set partitions = partitionIds.stream()
+ final Set<StreamPartition<PartitionIdType>> partitions =
partitionIds.stream()
Review Comment:
Thanks! :)
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -199,8 +200,9 @@ public void markRunFinished()
// Try to set the state to RUNNING or SUSPENDED. This will be rejected if
we haven't had atLeastOneSuccessfulRun
// (in favor of the more specific states for the initial run) and will
instead trigger setting the state to an
// unhealthy one if we are now over the error thresholds.
- maybeSetState(healthySteadyState);
-
+ if (!BasicState.IDLE.equals(supervisorState)) {
Review Comment:
Nit: Once `isIdle()` is moved to this class, use that here instead.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -743,6 +743,11 @@ public String getType()
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;
+ // snapshots latest sequences from stream to be verified in next run cycle
of inactive stream check
+ protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType>
previousPartitionOffsetsSnapshot = new ConcurrentHashMap<>();
Review Comment:
This should be private if it doesn't need to be accessed elsewhere.
It can be an ordinary map as it will only ever be accessed from
`runInternal`, which is invoked on a single thread.
You could also consider renaming it to something like
`previousSequencesFromStream` to correspond with `latestSequencesFromStream`.
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -240,6 +242,11 @@ protected boolean isStoreStackTrace()
return supervisorStateManagerConfig.isStoreStackTrace();
}
+ public boolean isEnableIdleBehavior()
Review Comment:
Is this used anywhere? `SeekableStreamSupervisor` seems to be directly the
checking the `SupervisorStateManagerConfig` itself for this field.
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -199,8 +200,9 @@ public void markRunFinished()
// Try to set the state to RUNNING or SUSPENDED. This will be rejected if
we haven't had atLeastOneSuccessfulRun
Review Comment:
```suggestion
// If the supervisor is not IDLE, try to set the state to RUNNING or
SUSPENDED.
// This will be rejected if we haven't had atLeastOneSuccessfulRun
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3977,7 +4041,7 @@ protected void emitNoticesQueueSize()
protected void emitLag()
{
- if (spec.isSuspended() || !stateManager.isSteadyState()) {
+ if (spec.isSuspended() || !stateManager.isSteadyState() && !isIdle()) {
Review Comment:
For readability:
```suggestion
if (spec.isSuspended() || (!stateManager.isSteadyState() && !isIdle())) {
```
##########
docs/configuration/index.md:
##########
@@ -1152,6 +1152,7 @@ There are additional configs for autoscaling (if it is
enabled):
|`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task
failures before the supervisor is considered unhealthy.|3|
|`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor
exceptions should be stored and returned by the supervisor `/status`
endpoint.|false|
|`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception
events that can be returned through the supervisor `/status`
endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
+|`druid.supervisor.enableIdleBehaviour`|If enabled, Supervisor can become idle
if there is no data on input stream/topic for some time. This time can be
configured via `awaitStreamInactiveMillis`. It can also be turned off per
Supervisor.|false|
Review Comment:
```suggestion
|`druid.supervisor.enableIdleBehaviour`|Whether a supervisor should become
idle and stop creating new tasks if there has been no new data on the input
stream for a specified period of time.|false|
```
Also add a separate entry for `awaitStreamInactiveMillis`.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -274,17 +269,19 @@ protected Map<Integer, Long> getPartitionTimeLag()
@SuppressWarnings("SSBasedInspection")
protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long>
currentOffsets)
{
- return currentOffsets
+ if (latestSequenceFromStream == null) {
+ return ImmutableMap.of();
+ }
+
+ return latestSequenceFromStream
.entrySet()
.stream()
.collect(
Collectors.toMap(
Entry::getKey,
- e -> latestSequenceFromStream != null
- && latestSequenceFromStream.get(e.getKey()) != null
- && e.getValue() != null
- ? latestSequenceFromStream.get(e.getKey()) - e.getValue()
- : Integer.MIN_VALUE
+ e -> e.getValue() != null
Review Comment:
Looks much cleaner now.
I hope this doesn't break any assumption where we expect `Integer.MIN_VALUE`
for partitions that are missing in the `latestSequenceFromStream`, if any.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -249,15 +252,7 @@ protected Map<Integer, Long> getPartitionRecordLag()
if (latestSequenceFromStream == null) {
return null;
}
-
- if
(!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
Review Comment:
Super nit: Any particular reason to move this? The method seems small enough
to have this here itself.
##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for
the Apache Kafka sup
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than
*2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there
is no data on input stream/topic for some time. This can only be enabled if
Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default
== false)|
+|`awaitStreamInactiveMillis`|Long|Minimum time interval to wait before a topic
is considered inactive. (i.e. all existing data has been read from the stream
and the topic is not getting new data).| no (default == 60000) |
Review Comment:
```suggestion
|`awaitStreamInactiveMillis`|Long|Minimum time interval to wait after the
topic has become inactive before marking the supervisor as idle. A topic is
considered to be inactive if all existing data has been read from it and no new
data is being published to it.| no (default == 60000) |
```
##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for
the Apache Kafka sup
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than
*2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there
is no data on input stream/topic for some time. This can only be enabled if
Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default
== false)|
Review Comment:
```suggestion
|`enableIdleBehaviour`|Boolean|Configure Kafka supervisor to become idle and
not create any more tasks if there has been no data on the input topic for some
time. This takes effect only if overlord config
`druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -743,6 +743,11 @@ public String getType()
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;
+ // snapshots latest sequences from stream to be verified in next run cycle
of inactive stream check
+ protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType>
previousPartitionOffsetsSnapshot = new ConcurrentHashMap<>();
+ private long activeLastTime;
Review Comment:
```suggestion
private long lastActiveTimeMillis;
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -743,6 +743,11 @@ public String getType()
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;
+ // snapshots latest sequences from stream to be verified in next run cycle
of inactive stream check
+ protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType>
previousPartitionOffsetsSnapshot = new ConcurrentHashMap<>();
+ private long activeLastTime;
+ private long idleTime;
Review Comment:
This should be a local variable and not a member field.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3648,25 +3704,33 @@ private void updateCurrentOffsets() throws
InterruptedException, ExecutionExcept
protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()
Review Comment:
Please add a javadoc to this method and maybe explain how this change
affects the idle behaviour.
##########
docs/development/extensions-core/kafka-supervisor-operations.md:
##########
@@ -56,6 +56,7 @@ The list of `detailedState` values and their corresponding
`state` mapping is as
|DISCOVERING_INITIAL_TASKS (first iteration only)|RUNNING|The supervisor is
discovering already-running tasks|
|CREATING_TASKS (first iteration only)|RUNNING|The supervisor is creating
tasks and discovering state|
|RUNNING|RUNNING|The supervisor has started tasks and is waiting for
taskDuration to elapse|
+|IDLE|IDLE|The supervisor is not creating tasks any longer since stream is
idle|
Review Comment:
```suggestion
|IDLE|IDLE|The supervisor is not creating tasks since the input stream has
not received any new data|
```
##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for
the Apache Kafka sup
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than
*2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there
is no data on input stream/topic for some time. This can only be enabled if
Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default
== false)|
Review Comment:
Shouldn't this default be `true`?
If I have enabled the idle behaviour at the overlord, shouldn't all
supervisors be allowed to go into idle state by default?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]