This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3e13584e0e Adds Idle feature to `SeekableStreamSupervisor` for
inactive stream (#13144)
3e13584e0e is described below
commit 3e13584e0ec18f2860e6817baf90bf44343769d9
Author: Tejaswini Bandlamudi <[email protected]>
AuthorDate: Wed Oct 12 18:31:08 2022 +0530
Adds Idle feature to `SeekableStreamSupervisor` for inactive stream (#13144)
* Idle Seekable stream supervisor changes.
* nit
* nit
* nit
* Adds unit tests
* Supervisor decides it's idle state instead of AutoScaler
* docs update
* nit
* nit
* docs update
* Adds Kafka unit test
* Adds Kafka Integration test.
* Updates travis config.
* Updates kafka-indexing-service dependencies.
* updates previous offsets snapshot & doc
* Doesn't act if supervisor is suspended.
* Fixes highest current offsets fetch bug, adds new Kafka UT tests, doc
changes.
* Reverts Kinesis Supervisor idle behaviour changes.
* nit
* nit
* Corrects SeekableStreamSupervisorSpec check on idle behaviour config,
adds tests.
* Fixes getHighestCurrentOffsets to fetch offsets of publishing tasks too
* Adds Kafka Supervisor UT
* Improves test coverage in druid-server
* Corrects IT override config
* Doc updates and Syntactic changes
* nit
* supervisorSpec.ioConfig.idleConfig changes
---
.../extensions-core/kafka-supervisor-operations.md | 5 +-
.../extensions-core/kafka-supervisor-reference.md | 18 +-
.../indexing/kafka/supervisor/KafkaSupervisor.java | 22 +-
.../kafka/supervisor/KafkaSupervisorIOConfig.java | 8 +-
.../druid/indexing/kafka/KafkaSamplerSpecTest.java | 2 +
.../supervisor/KafkaSupervisorIOConfigTest.java | 38 ++
.../kafka/supervisor/KafkaSupervisorTest.java | 470 ++++++++++++++++++++-
.../supervisor/KinesisSupervisorIOConfig.java | 3 +-
.../seekablestream/supervisor/IdleConfig.java | 68 +++
.../supervisor/SeekableStreamSupervisor.java | 96 ++++-
.../SeekableStreamSupervisorIOConfig.java | 13 +-
.../SeekableStreamSupervisorSpecTest.java | 84 +++-
.../SeekableStreamSupervisorStateManagerTest.java | 37 ++
.../SeekableStreamSupervisorStateTest.java | 97 ++++-
.../tests/indexer/AbstractStreamIndexingTest.java | 83 +++-
...ingServiceNonTransactionalParallelizedTest.java | 6 +
..._with_idle_behaviour_enabled_spec_template.json | 62 +++
.../supervisor/SupervisorStateManager.java | 19 +-
.../supervisor/SupervisorStateManagerTest.java | 44 ++
19 files changed, 1128 insertions(+), 47 deletions(-)
diff --git a/docs/development/extensions-core/kafka-supervisor-operations.md
b/docs/development/extensions-core/kafka-supervisor-operations.md
index ffd4135d2b..fe8d1f562b 100644
--- a/docs/development/extensions-core/kafka-supervisor-operations.md
+++ b/docs/development/extensions-core/kafka-supervisor-operations.md
@@ -56,6 +56,7 @@ The list of `detailedState` values and their corresponding
`state` mapping is as
|DISCOVERING_INITIAL_TASKS (first iteration only)|RUNNING|The supervisor is
discovering already-running tasks|
|CREATING_TASKS (first iteration only)|RUNNING|The supervisor is creating
tasks and discovering state|
|RUNNING|RUNNING|The supervisor has started tasks and is waiting for
taskDuration to elapse|
+|IDLE|IDLE|The supervisor is not creating tasks since the input stream has not
received any new data and all the existing data is read.|
|SUSPENDED|SUSPENDED|The supervisor has been suspended|
|STOPPING|STOPPING|The supervisor is stopping|
@@ -68,14 +69,14 @@ On each iteration of the supervisor's run loop, the
supervisor completes the fol
4) Handle tasks that have exceeded `taskDuration` and should transition from
the reading to publishing state.
5) Handle tasks that have finished publishing and signal redundant replica
tasks to stop.
6) Handle tasks that have failed and clean up the supervisor's internal
state.
- 7) Compare the list of healthy tasks to the requested `taskCount` and
`replicas` configurations and create additional tasks if required.
+ 7) Compare the list of healthy tasks to the requested `taskCount` and
`replicas` configurations and create additional tasks if required in case
supervisor is not idle.
The `detailedState` field will show additional values (those marked with
"first iteration only") the first time the
supervisor executes this run loop after startup or after resuming from a
suspension. This is intended to surface
initialization-type issues, where the supervisor is unable to reach a stable
state (perhaps because it can't connect to
Kafka, it can't read from the Kafka topic, or it can't communicate with
existing tasks). Once the supervisor is stable -
that is, once it has completed a full execution without encountering any
issues - `detailedState` will show a `RUNNING`
-state until it is stopped, suspended, or hits a task failure threshold and
transitions to an unhealthy state.
+state until it is idle, stopped, suspended, or hits a task failure threshold
and transitions to an unhealthy state.
## Getting Supervisor Ingestion Stats Report
diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md
b/docs/development/extensions-core/kafka-supervisor-reference.md
index 01a61fe403..f1153c7112 100644
--- a/docs/development/extensions-core/kafka-supervisor-reference.md
+++ b/docs/development/extensions-core/kafka-supervisor-reference.md
@@ -52,6 +52,7 @@ This topic contains configuration reference information for
the Apache Kafka sup
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime an [...]
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageReject [...]
|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest
tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no
(default == null)|
+|`idleConfig`|Object|Defines how and when Kafka Supervisor can become idle.
See [Idle Supervisor Configuration](#idle-supervisor-configuration) for more
details.|no (default == null)|
## Task Autoscaler Properties
@@ -79,7 +80,16 @@ This topic contains configuration reference information for
the Apache Kafka sup
| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
-The following example demonstrates supervisor spec with `lagBased` autoScaler
enabled:
+## Idle Supervisor Configuration
+
+> Note that Idle state transitioning is currently designated as experimental.
+
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enabled` | If `true`, Kafka supervisor will become idle if there is no data
on input stream/topic for some time. | no (default == false) |
+| `inactiveAfterMillis` | Supervisor is marked as idle if all existing data
has been read from input topic and no new data has been published for
`inactiveAfterMillis` milliseconds. | no (default == 600000) |
+
+The following example demonstrates supervisor spec with `lagBased` autoScaler
and idle config enabled:
```json
{
"type": "kafka",
@@ -114,7 +124,11 @@ The following example demonstrates supervisor spec with
`lagBased` autoScaler en
},
"taskCount":1,
"replicas":1,
- "taskDuration":"PT1H"
+ "taskDuration":"PT1H",
+ "idleConfig": {
+ "enabled": true,
+ "inactiveAfterMillis": 600000
+ }
},
"tuningConfig":{
...
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 9bac92429a..b5ecde675e 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -62,9 +62,11 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
@@ -274,17 +276,19 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<Integer, Long, Kaf
@SuppressWarnings("SSBasedInspection")
protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long>
currentOffsets)
{
- return currentOffsets
+ if (latestSequenceFromStream == null) {
+ return Collections.emptyMap();
+ }
+
+ return latestSequenceFromStream
.entrySet()
.stream()
.collect(
Collectors.toMap(
Entry::getKey,
- e -> latestSequenceFromStream != null
- && latestSequenceFromStream.get(e.getKey()) != null
- && e.getValue() != null
- ? latestSequenceFromStream.get(e.getKey()) - e.getValue()
- : Integer.MIN_VALUE
+ e -> e.getValue() != null
+ ? e.getValue() -
Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
+ : 0
)
);
}
@@ -383,6 +387,12 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<Integer, Long, Kaf
}
}
+ @Override
+ protected Map<Integer, Long> getLatestSequencesFromStream()
+ {
+ return latestSequenceFromStream != null ? latestSequenceFromStream : new
HashMap<>();
+ }
+
@Override
protected String baseTaskName()
{
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 2d6a800942..f467cc5510 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.StringUtils;
@@ -63,7 +64,8 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
@JsonProperty("lateMessageRejectionPeriod") Period
lateMessageRejectionPeriod,
@JsonProperty("earlyMessageRejectionPeriod") Period
earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime
lateMessageRejectionStartDateTime,
- @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides
+ @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
+ @JsonProperty("idleConfig") IdleConfig idleConfig
)
{
super(
@@ -79,7 +81,8 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
- lateMessageRejectionStartDateTime
+ lateMessageRejectionStartDateTime,
+ idleConfig
);
this.consumerProperties = Preconditions.checkNotNull(consumerProperties,
"consumerProperties");
@@ -140,6 +143,7 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() +
", lateMessageRejectionStartDateTime=" +
getLateMessageRejectionStartDateTime() +
", configOverrides=" + getConfigOverrides() +
+ ", idleConfig=" + getIdleConfig() +
'}';
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 89b824778d..abe25b5391 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -144,6 +144,7 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
null
),
null,
@@ -320,6 +321,7 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
null
),
null,
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 5b231deb31..b6a95769b8 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -305,6 +306,7 @@ public class KafkaSupervisorIOConfigTest
null,
null,
null,
+ null,
null
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
@@ -318,4 +320,40 @@ public class KafkaSupervisorIOConfigTest
kafkaSupervisorIOConfig1.getAutoScalerConfig().getMinTriggerScaleActionFrequencyMillis()
);
}
+
+ @Test
+ public void testIdleConfigSerde() throws JsonProcessingException
+ {
+ HashMap<String, Object> idleConfig = new HashMap<>();
+ idleConfig.put("enabled", true);
+
+ final Map<String, Object> consumerProperties =
KafkaConsumerConfigs.getConsumerProperties();
+ consumerProperties.put("bootstrap.servers", "localhost:8082");
+
+ KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new
KafkaSupervisorIOConfig(
+ "test",
+ null,
+ 1,
+ 1,
+ new Period("PT1H"),
+ consumerProperties,
+ null,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ new Period("P1D"),
+ new Period("PT30S"),
+ true,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ null,
+ mapper.convertValue(idleConfig, IdleConfig.class)
+ );
+ String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
+ KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 =
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
+
+ Assert.assertNotNull(kafkaSupervisorIOConfig1.getIdleConfig());
+ Assert.assertTrue(kafkaSupervisorIOConfig1.getIdleConfig().isEnabled());
+ Assert.assertEquals(600000L,
kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis());
+ }
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 9009fd80d3..c30795cdd1 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -67,6 +67,7 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.St
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
@@ -273,7 +274,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
autoScalerConfig.put("scaleInThreshold", 1000000);
autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
autoScalerConfig.put("scaleActionStartDelayMillis", 0);
- autoScalerConfig.put("scaleActionPeriodMillis", 100);
+ autoScalerConfig.put("scaleActionPeriodMillis", 600);
autoScalerConfig.put("taskCountMax", 2);
autoScalerConfig.put("taskCountMin", 1);
autoScalerConfig.put("scaleInStep", 1);
@@ -300,7 +301,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
- null
+ null,
+ new IdleConfig(true, 1000L)
);
final KafkaSupervisorTuningConfig tuningConfigOri = new
KafkaSupervisorTuningConfig(
@@ -380,7 +382,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null
)
).anyTimes();
- EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
replayAll();
@@ -389,11 +391,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(1, taskCountBeforeScale);
autoscaler.start();
supervisor.runInternal();
- Thread.sleep(1 * 1000);
+ Thread.sleep(1000);
+ supervisor.runInternal();
verifyAll();
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScale);
+ Assert.assertEquals(SupervisorStateManager.BasicState.IDLE,
supervisor.getState());
KafkaIndexTask task = captured.getValue();
@@ -1855,6 +1859,425 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow());
}
+ @Test
+ public void testSupervisorIsIdleIfStreamInactive() throws Exception
+ {
+ supervisor = getTestableSupervisorForIdleBehaviour(
+ 1,
+ 2,
+ true,
+ "PT10S",
+ null,
+ null,
+ false,
+ new IdleConfig(true, 200L)
+ );
+
+ addSomeEvents(100);
+
+ final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location2 = new TaskLocation("testHost", 234, -1);
+
+ Task id1 = createKafkaIndexTask(
+ "id1",
+ DATASOURCE,
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, 10L, 2, 30L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
+ null,
+ null,
+ supervisor.getTuningConfig()
+ );
+
+ Task id2 = createKafkaIndexTask(
+ "id2",
+ DATASOURCE,
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(1, 20L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(1, Long.MAX_VALUE)
+ ),
+ null,
+ null,
+ supervisor.getTuningConfig()
+ );
+
+ List<Task> existingTasks = ImmutableList.of(id1, id2);
+
+ Collection workItems = new ArrayList<>();
+ workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
+ workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
+
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
+
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+ EasyMock.expect(taskClient.getStatusAsync("id1"))
+ .andReturn(Futures.immediateFuture(Status.READING))
+ .anyTimes();
+ EasyMock.expect(taskClient.getStatusAsync("id2"))
+ .andReturn(Futures.immediateFuture(Status.READING))
+ .anyTimes();
+ EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+ .anyTimes();
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ null
+ )
+ ).anyTimes();
+
+ replayAll();
+
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 0,
+ ImmutableMap.of(0, 0L, 2, 0L),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("id1"),
+ ImmutableSet.of()
+ );
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 1,
+ ImmutableMap.of(1, 0L),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("id2"),
+ ImmutableSet.of()
+ );
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ EasyMock.reset(taskClient);
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 25L, 2,
45L)));
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 45L)));
+
+ EasyMock.replay(taskClient);
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ EasyMock.reset(taskClient);
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 25L, 2,
45L)));
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 45L)));
+
+ EasyMock.replay(taskClient);
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Assert.assertNotEquals(supervisor.getState(),
SupervisorStateManager.BasicState.IDLE);
+
+ EasyMock.reset(taskClient);
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2,
101L)));
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
+
+ EasyMock.replay(taskClient);
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ EasyMock.reset(taskClient);
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2,
101L)));
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
+
+ EasyMock.replay(taskClient);
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Assert.assertEquals(SupervisorStateManager.BasicState.IDLE,
supervisor.getState());
+ }
+
+ @Test
+ public void testSupervisorIsIdleIfStreamInactiveWhenNoActiveTasks() throws
Exception
+ {
+ supervisor = getTestableSupervisorForIdleBehaviour(
+ 1,
+ 2,
+ true,
+ "PT10S",
+ null,
+ null,
+ false,
+ new IdleConfig(true, 200L)
+ );
+ addSomeEvents(1);
+
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<Integer, Long>(topic,
ImmutableMap.of(0, 2L, 1, 2L, 2, 2L))
+ )
+ ).anyTimes();
+
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+ replayAll();
+
+ supervisor.start();
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+ verifyAll();
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Assert.assertEquals(SupervisorStateManager.BasicState.IDLE,
supervisor.getState());
+ }
+
+ @Test
+ public void testSupervisorNotIdleIfStreamInactiveWhenSuspended() throws
Exception
+ {
+ supervisor = getTestableSupervisorForIdleBehaviour(
+ 1,
+ 2,
+ true,
+ "PT10S",
+ null,
+ null,
+ true,
+ new IdleConfig(true, 200L)
+ );
+ addSomeEvents(1);
+
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<Integer, Long>(topic,
ImmutableMap.of(0, 2L, 1, 2L, 2, 2L))
+ )
+ ).anyTimes();
+
+ replayAll();
+
+ supervisor.start();
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Assert.assertEquals(SupervisorStateManager.BasicState.SUSPENDED,
supervisor.getState());
+ }
+
+ @Test
+ public void
testSupervisorIsIdleIfStreamInactiveWhenNoActiveTasksAndFewPendingTasks()
throws Exception
+ {
+ supervisor = getTestableSupervisorForIdleBehaviour(
+ 1,
+ 2,
+ true,
+ "PT10S",
+ null,
+ null,
+ false,
+ new IdleConfig(true, 200L)
+ );
+
+ addSomeEvents(100);
+
+ final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
+ final TaskLocation location2 = new TaskLocation("testHost", 234, -1);
+
+ Task id1 = createKafkaIndexTask(
+ "id1",
+ DATASOURCE,
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, 10L, 2, 30L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
+ null,
+ null,
+ supervisor.getTuningConfig()
+ );
+
+ Task id2 = createKafkaIndexTask(
+ "id2",
+ DATASOURCE,
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(1, 20L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(1, Long.MAX_VALUE)
+ ),
+ null,
+ null,
+ supervisor.getTuningConfig()
+ );
+
+ List<Task> existingTasks = ImmutableList.of(id1, id2);
+
+ Collection workItems = new ArrayList<>();
+ workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
+ workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
+
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
+
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+ EasyMock.expect(taskClient.getStatusAsync("id1"))
+ .andReturn(Futures.immediateFuture(Status.READING))
+ .anyTimes();
+ EasyMock.expect(taskClient.getStatusAsync("id2"))
+ .andReturn(Futures.immediateFuture(Status.READING))
+ .anyTimes();
+ EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+ .anyTimes();
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ null
+ )
+ ).anyTimes();
+
+ replayAll();
+
+ supervisor.start();
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 0,
+ ImmutableMap.of(0, 0L, 2, 0L),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("id1"),
+ ImmutableSet.of()
+ );
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 1,
+ ImmutableMap.of(1, 0L),
+ Optional.absent(),
+ Optional.absent(),
+ ImmutableSet.of("id2"),
+ ImmutableSet.of()
+ );
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ EasyMock.reset(taskClient);
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 25L, 2,
45L)));
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 45L)));
+
+ EasyMock.replay(taskClient);
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ EasyMock.reset(taskClient);
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2,
101L)));
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
+
+ EasyMock.replay(taskClient);
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ EasyMock.reset(taskClient);
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2,
101L)));
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
+
+ EasyMock.replay(taskClient);
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Assert.assertEquals(SupervisorStateManager.BasicState.IDLE,
supervisor.getState());
+
+ supervisor.moveTaskGroupToPendingCompletion(0);
+ supervisor.moveTaskGroupToPendingCompletion(1);
+
+ Assert.assertEquals(0, supervisor.getActiveTaskGroupsCount());
+
+ EasyMock.reset(taskClient);
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 101L, 2,
101L)));
+
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(1, 101L)));
+
+ EasyMock.replay(taskClient);
+
+ Thread.sleep(100);
+ supervisor.updateCurrentAndLatestOffsets();
+ supervisor.runInternal();
+
+ Assert.assertEquals(SupervisorStateManager.BasicState.IDLE,
supervisor.getState());
+ }
+
@Test
public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
{
@@ -3595,7 +4018,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
false,
- kafkaHost
+ kafkaHost,
+ null
);
}
@@ -3619,7 +4043,33 @@ public class KafkaSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
suspended,
- kafkaHost
+ kafkaHost,
+ null
+ );
+ }
+
+ private TestableKafkaSupervisor getTestableSupervisorForIdleBehaviour(
+ int replicas,
+ int taskCount,
+ boolean useEarliestOffset,
+ String duration,
+ Period lateMessageRejectionPeriod,
+ Period earlyMessageRejectionPeriod,
+ boolean suspended,
+ IdleConfig idleConfig
+ )
+ {
+ return getTestableSupervisor(
+ replicas,
+ taskCount,
+ useEarliestOffset,
+ false,
+ duration,
+ lateMessageRejectionPeriod,
+ earlyMessageRejectionPeriod,
+ suspended,
+ kafkaHost,
+ idleConfig
);
}
@@ -3632,7 +4082,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
boolean suspended,
- String kafkaHost
+ String kafkaHost,
+ IdleConfig idleConfig
)
{
final Map<String, Object> consumerProperties =
KafkaConsumerConfigs.getConsumerProperties();
@@ -3654,7 +4105,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
- null
+ null,
+ idleConfig
);
KafkaIndexTaskClientFactory taskClientFactory = new
KafkaIndexTaskClientFactory(
@@ -3767,6 +4219,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
+ null,
null
);
@@ -3884,6 +4337,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
+ null,
null
);
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index 220c22fcf4..38d92fb880 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -90,7 +90,8 @@ public class KinesisSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
- lateMessageRejectionStartDateTime
+ lateMessageRejectionStartDateTime,
+ null
);
this.endpoint = endpoint != null
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
new file mode 100644
index 0000000000..6b017399dd
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+
+public class IdleConfig
+{
+ private final boolean enabled;
+ private final long inactiveAfterMillis;
+
+ @JsonCreator
+ public IdleConfig(
+ @Nullable @JsonProperty("enabled") Boolean enabled,
+ @Nullable @JsonProperty("inactiveAfterMillis") Long inactiveAfterMillis
+ )
+ {
+ this.enabled = enabled != null && enabled;
+ this.inactiveAfterMillis = inactiveAfterMillis != null
+ ? inactiveAfterMillis
+ : 600_000L;
+
+ Preconditions.checkArgument(this.inactiveAfterMillis > 0,
+ "inactiveAfterMillis should be a postive
number");
+ }
+
+ @JsonProperty
+ public boolean isEnabled()
+ {
+ return this.enabled;
+ }
+
+ @JsonProperty
+ public long getInactiveAfterMillis()
+ {
+ return this.inactiveAfterMillis;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "idleConfig{" +
+ "enabled=" + enabled +
+ ", inactiveAfterMillis=" + inactiveAfterMillis +
+ '}';
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 1093584d11..586c0cb1e7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -743,6 +743,10 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;
+ // snapshots latest sequences from stream to be verified in next run cycle
of inactive stream check
+ private final Map<PartitionIdType, SequenceOffsetType>
previousSequencesFromStream = new HashMap<>();
+ private long lastActiveTimeMillis;
+
public SeekableStreamSupervisor(
final String supervisorId,
final TaskStorage taskStorage,
@@ -1454,6 +1458,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
checkCurrentTaskState();
+ checkIfStreamInactiveAndTurnSupervisorIdle();
+
// If supervisor is already stopping, don't contend for stateChangeLock
since the block can be skipped
if
(stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING))
{
generateAndLogReport();
@@ -1466,6 +1472,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if
(stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING))
{
// if we're already terminating, don't do anything here, the
terminate already handles shutdown
log.info("[%s] supervisor is already stopping.", dataSource);
+ } else if (stateManager.isIdle()) {
+ log.info("[%s] supervisor is idle.", dataSource);
} else if (!spec.isSuspended()) {
log.info("[%s] supervisor is running.", dataSource);
@@ -2502,11 +2510,19 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return true;
}
+ /**
+ * gets mapping of partitions in stream to their latest offsets.
+ */
+ protected Map<PartitionIdType, SequenceOffsetType>
getLatestSequencesFromStream()
+ {
+ return new HashMap<>();
+ }
+
private void assignRecordSupplierToPartitionIds()
{
recordSupplierLock.lock();
try {
- final Set partitions = partitionIds.stream()
+ final Set<StreamPartition<PartitionIdType>> partitions =
partitionIds.stream()
.map(partitionId -> new
StreamPartition<>(ioConfig.getStream(), partitionId))
.collect(Collectors.toSet());
if (!recordSupplier.getAssignment().containsAll(partitions)) {
@@ -3244,6 +3260,44 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Futures.successfulAsList(futures).get(futureTimeoutInSeconds,
TimeUnit.SECONDS);
}
+ private void checkIfStreamInactiveAndTurnSupervisorIdle()
+ {
+ IdleConfig idleConfig = spec.getIoConfig().getIdleConfig();
+ if ((idleConfig == null || !idleConfig.isEnabled()) || spec.isSuspended())
{
+ return;
+ }
+
+ Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream =
getLatestSequencesFromStream();
+ long nowTime = Instant.now().toEpochMilli();
+ boolean idle;
+ long idleTime;
+
+ if (lastActiveTimeMillis > 0
+ && previousSequencesFromStream.equals(latestSequencesFromStream)
+ && computeTotalLag() == 0) {
+ idleTime = nowTime - lastActiveTimeMillis;
+ idle = true;
+ } else {
+ idleTime = 0L;
+ lastActiveTimeMillis = nowTime;
+ idle = false;
+ }
+
+ previousSequencesFromStream.clear();
+ previousSequencesFromStream.putAll(latestSequencesFromStream);
+ if (!idle) {
+ stateManager.maybeSetState(SupervisorStateManager.BasicState.RUNNING);
+ } else if (!stateManager.isIdle() && idleTime >
idleConfig.getInactiveAfterMillis()) {
+ stateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE);
+ }
+ }
+
+ private long computeTotalLag()
+ {
+ LagStats lagStats = computeLagStats();
+ return lagStats != null ? lagStats.getTotalLag() : 0;
+ }
+
/**
* If the seekable stream system supported by this supervisor allows for
partition expiration, expired partitions
* should be removed from the starting offsets sent to the tasks.
@@ -3595,7 +3649,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// if we aren't in a steady state, chill out for a bit, don't worry, we'll
get called later, but if we aren't
// healthy go ahead and try anyway to try if possible to provide insight
into how much time is left to fix the
// issue for cluster operators since this feeds the lag metrics
- if (stateManager.isSteadyState() || !stateManager.isHealthy()) {
+ if (stateManager.isIdle() || stateManager.isSteadyState() ||
!stateManager.isHealthy()) {
try {
updateCurrentOffsets();
updatePartitionLagFromStream();
@@ -3646,27 +3700,41 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@Nullable
protected abstract Map<PartitionIdType, Long> getPartitionTimeLag();
+ /**
+ * Gets highest current offsets of all the tasks (actively reading and
publishing) for all partitions of the stream.
+ * In case if no task is reading for a partition, returns offset stored in
metadata storage for that partition.
+ * In case of no active and publishing task groups, returns offsets stored
in metadata storage.
+ * Used to compute lag by comparing with latest offsets from stream for
reporting and determining idleness.
+ */
protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()
{
+ Map<PartitionIdType, SequenceOffsetType> offsetsFromMetadataStorage =
getOffsetsFromMetadataStorage();
if (!spec.isSuspended()) {
if (activelyReadingTaskGroups.size() > 0 ||
pendingCompletionTaskGroups.size() > 0) {
- return activelyReadingTaskGroups
- .values()
- .stream()
- .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
- .flatMap(taskData ->
taskData.getValue().currentSequences.entrySet().stream())
- .collect(Collectors.toMap(
+ Map<PartitionIdType, SequenceOffsetType> currentOffsets =
+ Stream.concat(
+ activelyReadingTaskGroups
+ .values()
+ .stream()
+ .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
+ .flatMap(taskData ->
taskData.getValue().currentSequences.entrySet().stream()),
+ pendingCompletionTaskGroups
+ .values()
+ .stream()
+ .flatMap(taskGroups ->
taskGroups.stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()))
+ .flatMap(taskData ->
taskData.getValue().currentSequences.entrySet().stream())
+ ).collect(Collectors.toMap(
Entry::getKey,
Entry::getValue,
(v1, v2) ->
makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
));
+
+ partitionIds.forEach(partitionId ->
currentOffsets.putIfAbsent(partitionId,
offsetsFromMetadataStorage.get(partitionId)));
+ return currentOffsets;
}
- // nothing is running but we are not suspended, so lets just hang out in
case we get called while things start up
- return ImmutableMap.of();
- } else {
- // if supervisor is suspended, no tasks are likely running so use
offsets in metadata, if exist
- return getOffsetsFromMetadataStorage();
}
+ // if supervisor is suspended or is idle and nothing is running, use
offsets in metadata, if exist
+ return offsetsFromMetadataStorage;
}
private OrderedSequenceNumber<SequenceOffsetType>
makeSequenceNumber(SequenceOffsetType seq)
@@ -3977,7 +4045,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected void emitLag()
{
- if (spec.isSuspended() || !stateManager.isSteadyState()) {
+ if (spec.isSuspended() || !(stateManager.isSteadyState() ||
stateManager.isIdle())) {
// don't emit metrics if supervisor is suspended or not in a healthy
running state
// (lag should still available in status report)
return;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 6ddb018d22..700236a741 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -48,6 +48,7 @@ public abstract class SeekableStreamSupervisorIOConfig
private final Optional<Duration> earlyMessageRejectionPeriod;
private final Optional<DateTime> lateMessageRejectionStartDateTime;
@Nullable private final AutoScalerConfig autoScalerConfig;
+ @Nullable private final IdleConfig idleConfig;
public SeekableStreamSupervisorIOConfig(
String stream,
@@ -62,7 +63,8 @@ public abstract class SeekableStreamSupervisorIOConfig
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
@Nullable AutoScalerConfig autoScalerConfig,
- DateTime lateMessageRejectionStartDateTime
+ DateTime lateMessageRejectionStartDateTime,
+ @Nullable IdleConfig idleConfig
)
{
this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
@@ -97,6 +99,8 @@ public abstract class SeekableStreamSupervisorIOConfig
+ "both properties lateMessageRejectionStartDateTime "
+ "and lateMessageRejectionPeriod.");
}
+
+ this.idleConfig = idleConfig;
}
private static Duration defaultDuration(final Period period, final String
theDefault)
@@ -188,4 +192,11 @@ public abstract class SeekableStreamSupervisorIOConfig
{
return lateMessageRejectionStartDateTime;
}
+
+ @Nullable
+ @JsonProperty
+ public IdleConfig getIdleConfig()
+ {
+ return idleConfig;
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 7116b88137..7f67393fc9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -40,6 +40,7 @@ import
org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
@@ -77,6 +78,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
@@ -865,7 +867,10 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
false,
new Period("PT30M"),
null,
- null, null, null
+ null,
+ null,
+ null,
+ null
)
{
};
@@ -902,6 +907,75 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
autoScaler.stop();
}
+ @Test
+ public void testEnablingIdleBeviourPerSupervisorWithOverlordConfigEnabled()
+ {
+ SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new
SeekableStreamSupervisorIOConfig(
+ "stream",
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
+ 1,
+ 1,
+ new Period("PT1H"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ null,
+ new IdleConfig(true, null)
+ ){
+ };
+
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ spec = new SeekableStreamSupervisorSpec(
+ ingestionSchema,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ supervisorStateManagerConfig
+ )
+ {
+ @Override
+ public Supervisor createSupervisor()
+ {
+ return null;
+ }
+
+ @Override
+ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
+ {
+ return null;
+ }
+
+ @Override
+ public String getType()
+ {
+ return null;
+ }
+
+ @Override
+ public String getSource()
+ {
+ return null;
+ }
+ };
+
+
Assert.assertTrue(Objects.requireNonNull(spec.getIoConfig().getIdleConfig()).isEnabled());
+ Assert.assertEquals(600000L,
spec.getIoConfig().getIdleConfig().getInactiveAfterMillis());
+ }
+
private static DataSchema getDataSchema()
{
List<DimensionSchema> dimensions = new ArrayList<>();
@@ -937,7 +1011,9 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
new Period("PT30M"),
null,
null,
- mapper.convertValue(getScaleOutProperties(2),
AutoScalerConfig.class), null
+ mapper.convertValue(getScaleOutProperties(2),
AutoScalerConfig.class),
+ null,
+ null
)
{
};
@@ -954,7 +1030,9 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
new Period("PT30M"),
null,
null,
- mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class),
null
+ mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class),
+ null,
+ null
)
{
};
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
index e19dc6354d..4d4eaccb60 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
@@ -98,6 +98,43 @@ public class SeekableStreamSupervisorStateManagerTest
Assert.assertEquals(BasicState.RUNNING,
stateManager.getSupervisorState().getBasicState());
}
+ @Test
+ public void testIdlePath()
+ {
+ Assert.assertEquals(BasicState.PENDING, stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.PENDING,
stateManager.getSupervisorState().getBasicState());
+
+
stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
+ Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM,
stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.RUNNING,
stateManager.getSupervisorState().getBasicState());
+
+ stateManager.maybeSetState(SeekableStreamState.DISCOVERING_INITIAL_TASKS);
+ Assert.assertEquals(SeekableStreamState.DISCOVERING_INITIAL_TASKS,
stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.RUNNING,
stateManager.getSupervisorState().getBasicState());
+
+ stateManager.maybeSetState(SeekableStreamState.CREATING_TASKS);
+ Assert.assertEquals(SeekableStreamState.CREATING_TASKS,
stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.RUNNING,
stateManager.getSupervisorState().getBasicState());
+
+ stateManager.markRunFinished();
+ Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.RUNNING,
stateManager.getSupervisorState().getBasicState());
+
+ // Emulates submitting Idle notice
+ stateManager.maybeSetState(BasicState.IDLE);
+ Assert.assertEquals(BasicState.IDLE, stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.IDLE,
stateManager.getSupervisorState().getBasicState());
+
+ // Stay in idle state when supervisor is running until or unless it is
specifically set to a different state
+ stateManager.markRunFinished();
+ Assert.assertEquals(BasicState.IDLE, stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.IDLE,
stateManager.getSupervisorState().getBasicState());
+
+ stateManager.maybeSetState(BasicState.RUNNING);
+ Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.RUNNING,
stateManager.getSupervisorState().getBasicState());
+ }
+
@Test
public void testStoppingPath()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 8d56b757df..ac767a4e90 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -465,6 +465,95 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
verifyAll();
}
+ @Test
+ public void testIdleStateTransition() throws Exception
+ {
+ EasyMock.reset(spec);
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(new
SeekableStreamSupervisorIOConfig(
+ "stream",
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
+ 1,
+ 1,
+ new Period("PT1H"),
+ new Period("PT1S"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ null,
+ new IdleConfig(true, 200L)
+ )
+ {
+ }).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new
DruidMonitorSchedulerConfig() {
+ @Override
+ public Duration getEmitterPeriod()
+ {
+ return new Period("PT1S").toStandardDuration();
+ }
+ }).anyTimes();
+ EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+ replayAll();
+
+ SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+ supervisor.start();
+
+ Assert.assertTrue(supervisor.stateManager.isHealthy());
+ Assert.assertEquals(BasicState.PENDING,
supervisor.stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.PENDING,
supervisor.stateManager.getSupervisorState().getBasicState());
+ Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+ Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+ supervisor.runInternal();
+
+ Assert.assertTrue(supervisor.stateManager.isHealthy());
+ Assert.assertEquals(BasicState.RUNNING,
supervisor.stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.RUNNING,
supervisor.stateManager.getSupervisorState().getBasicState());
+ Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+ Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+ Thread.sleep(100L);
+ supervisor.runInternal();
+
+ Assert.assertTrue(supervisor.stateManager.isHealthy());
+ Assert.assertEquals(BasicState.RUNNING,
supervisor.stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.RUNNING,
supervisor.stateManager.getSupervisorState().getBasicState());
+ Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+ Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+ Thread.sleep(100L);
+ supervisor.runInternal();
+
+ Assert.assertTrue(supervisor.stateManager.isHealthy());
+ Assert.assertEquals(BasicState.IDLE,
supervisor.stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.IDLE,
supervisor.stateManager.getSupervisorState().getBasicState());
+ Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+ Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+ Thread.sleep(100L);
+ supervisor.runInternal();
+
+ Assert.assertTrue(supervisor.stateManager.isHealthy());
+ Assert.assertEquals(BasicState.IDLE,
supervisor.stateManager.getSupervisorState());
+ Assert.assertEquals(BasicState.IDLE,
supervisor.stateManager.getSupervisorState().getBasicState());
+ Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+ Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+ verifyAll();
+ }
+
@Test
public void testCreatingTasksFailRecoveryFail() throws Exception
{
@@ -908,6 +997,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
null,
+ null,
null
)
{
@@ -964,7 +1054,10 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
false,
new Period("PT30M"),
null,
- null, OBJECT_MAPPER.convertValue(getProperties(),
AutoScalerConfig.class), null
+ null,
+ OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class),
+ null,
+ null
)
{
};
@@ -1339,7 +1432,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
@Override
public LagStats computeLagStats()
{
- return null;
+ return new LagStats(0, 0, 0);
}
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index c98d249803..9ea1b5c402 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -79,12 +79,16 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
private static final String QUERIES_FILE =
"/stream/queries/stream_index_queries.json";
private static final String SUPERVISOR_SPEC_TEMPLATE_FILE =
"supervisor_spec_template.json";
private static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE =
"supervisor_with_autoscaler_spec_template.json";
+ private static final String
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE =
+ "supervisor_with_idle_behaviour_enabled_spec_template.json";
protected static final String DATA_RESOURCE_ROOT = "/stream/data";
protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
protected static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT,
SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE);
+ protected static final String
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH =
+ String.join("/", DATA_RESOURCE_ROOT,
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE);
protected static final String SERIALIZER_SPEC_DIR = "serializer";
protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
@@ -332,6 +336,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
// Start supervisor
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
LOG.info("Submitted supervisor");
+ String dataSource = generatedTestConfig.getFullDatasourceName();
// Start generating half of the data
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
@@ -358,7 +363,10 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
// wait for autoScaling task numbers from 1 to 2.
ITRetryUtil.retryUntil(
- () -> indexer.getRunningTasks().size() == 2,
+ () -> indexer.getRunningTasks()
+ .stream()
+ .filter(taskResponseObject ->
taskResponseObject.getId().contains(dataSource))
+ .count() == 2,
true,
10000,
50,
@@ -378,6 +386,79 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
}
}
+ protected void doTestIndexDataWithIdleConfigEnabled(@Nullable Boolean
transactionEnabled) throws Exception
+ {
+ final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
+ INPUT_FORMAT,
+ getResourceAsString(JSON_INPUT_FORMAT_PATH)
+ );
+ try (
+ final Closeable closer = createResourceCloser(generatedTestConfig);
+ final StreamEventWriter streamEventWriter =
createStreamEventWriter(config, transactionEnabled)
+ ) {
+ final String taskSpec =
generatedTestConfig.getStreamIngestionPropsTransform()
+
.apply(getResourceAsString(SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH));
+ LOG.info("supervisorSpec: [%s]\n", taskSpec);
+ // Start supervisor
+ generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+ LOG.info("Submitted supervisor");
+ String dataSource = generatedTestConfig.getFullDatasourceName();
+ // Start generating half of the data
+ int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+ int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+ secondsToGenerateRemaining = secondsToGenerateRemaining -
secondsToGenerateFirstRound;
+ final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(
+ new JsonEventSerializer(jsonMapper),
+ EVENTS_PER_SECOND,
+ CYCLE_PADDING_MS
+ );
+ long numWritten = streamGenerator.run(
+ generatedTestConfig.getStreamName(),
+ streamEventWriter,
+ secondsToGenerateFirstRound,
+ FIRST_EVENT_TIME
+ );
+ // Verify supervisor is healthy before suspension
+ ITRetryUtil.retryUntil(
+ () ->
SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+ true,
+ 10000,
+ 30,
+ "Waiting for supervisor to be healthy"
+ );
+
+ ITRetryUtil.retryUntil(
+ () ->
SupervisorStateManager.BasicState.IDLE.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+ true,
+ 10000,
+ 30,
+ "Waiting for supervisor to be idle"
+ );
+
+ // wait for no more creation of indexing tasks.
+ ITRetryUtil.retryUntil(
+ () -> indexer.getRunningTasks()
+ .stream()
+ .noneMatch(taskResponseObject ->
taskResponseObject.getId().contains(dataSource)),
+ true,
+ 10000,
+ 50,
+ "wait for no more creation of indexing tasks"
+ );
+
+ // Start generating remainning half of the data
+ numWritten += streamGenerator.run(
+ generatedTestConfig.getStreamName(),
+ streamEventWriter,
+ secondsToGenerateRemaining,
+ FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
+ );
+
+ // Verify that supervisor can catch up with the stream
+ verifyIngestedData(generatedTestConfig, numWritten);
+ }
+ }
+
protected void doTestTerminatedSupervisorAutoCleanup(@Nullable Boolean
transactionEnabled) throws Exception
{
final GeneratedTestConfig generatedTestConfig1 = new GeneratedTestConfig(
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
index 936e7a177d..e08b6c0ebe 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
@@ -62,6 +62,12 @@ public class
ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
doTestIndexDataWithAutoscaler(false);
}
+ @Test
+ public void testIndexDataWithIdleConfigEnabled() throws Exception
+ {
+ doTestIndexDataWithIdleConfigEnabled(false);
+ }
+
/**
* This test can be run concurrently with other tests as it
creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
diff --git
a/integration-tests/src/test/resources/stream/data/supervisor_with_idle_behaviour_enabled_spec_template.json
b/integration-tests/src/test/resources/stream/data/supervisor_with_idle_behaviour_enabled_spec_template.json
new file mode 100644
index 0000000000..2f897d26c3
--- /dev/null
+++
b/integration-tests/src/test/resources/stream/data/supervisor_with_idle_behaviour_enabled_spec_template.json
@@ -0,0 +1,62 @@
+{
+ "type": "%%STREAM_TYPE%%",
+ "dataSchema": {
+ "dataSource": "%%DATASOURCE%%",
+ "parser": %%PARSER%%,
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": ["page", "language", "user", "unpatrolled", "newPage",
"robot", "anonymous", "namespace", "continent", "country", "region", "city"],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ },
+ "metricsSpec": [
+ {
+ "type": "count",
+ "name": "count"
+ },
+ {
+ "type": "doubleSum",
+ "name": "added",
+ "fieldName": "added"
+ },
+ {
+ "type": "doubleSum",
+ "name": "deleted",
+ "fieldName": "deleted"
+ },
+ {
+ "type": "doubleSum",
+ "name": "delta",
+ "fieldName": "delta"
+ }
+ ],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "MINUTE",
+ "queryGranularity": "NONE"
+ }
+ },
+ "tuningConfig": {
+ "type": "%%STREAM_TYPE%%",
+ "intermediatePersistPeriod": "PT30S",
+ "maxRowsPerSegment": 5000000,
+ "maxRowsInMemory": 500000
+ },
+ "ioConfig": {
+ "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
+ "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
+ "autoScalerConfig": null,
+ "taskCount": 1,
+ "replicas": 1,
+ "taskDuration": "PT120S",
+ "%%USE_EARLIEST_KEY%%": true,
+ "inputFormat" : %%INPUT_FORMAT%%,
+ "idleConfig": {
+ "enabled": true,
+ "inactiveAfterMillis": 10000
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
index 346fcd20de..1ea36229c3 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
@@ -62,6 +62,7 @@ public class SupervisorStateManager
PENDING(true, true),
RUNNING(true, false),
+ IDLE(true, false),
SUSPENDED(true, false),
STOPPING(true, false);
@@ -154,10 +155,11 @@ public class SupervisorStateManager
return;
}
- // if we're trying to switch to a healthy steady state (i.e. RUNNING or
SUSPENDED) but haven't had a successful run
+ // if we're trying to switch to a healthy steady state (i.e. RUNNING or
SUSPENDED) or IDLE state but haven't had a successful run
// yet, refuse to switch and prefer the more specific states used for
first run (CONNECTING_TO_STREAM,
// DISCOVERING_INITIAL_TASKS, CREATING_TASKS, etc.)
- if (healthySteadyState.equals(proposedState) && !atLeastOneSuccessfulRun) {
+ if ((healthySteadyState.equals(proposedState) ||
BasicState.IDLE.equals(proposedState))
+ && !atLeastOneSuccessfulRun) {
return;
}
@@ -196,11 +198,13 @@ public class SupervisorStateManager
consecutiveSuccessfulRuns = currentRunSuccessful ?
consecutiveSuccessfulRuns + 1 : 0;
consecutiveFailedRuns = currentRunSuccessful ? 0 : consecutiveFailedRuns +
1;
- // Try to set the state to RUNNING or SUSPENDED. This will be rejected if
we haven't had atLeastOneSuccessfulRun
+ // If the supervisor is not IDLE, try to set the state to RUNNING or
SUSPENDED.
+ // This will be rejected if we haven't had atLeastOneSuccessfulRun
// (in favor of the more specific states for the initial run) and will
instead trigger setting the state to an
// unhealthy one if we are now over the error thresholds.
- maybeSetState(healthySteadyState);
-
+ if (!isIdle()) {
+ maybeSetState(healthySteadyState);
+ }
// reset for next run
currentRunSuccessful = true;
}
@@ -230,6 +234,11 @@ public class SupervisorStateManager
return atLeastOneSuccessfulRun;
}
+ public boolean isIdle()
+ {
+ return SupervisorStateManager.BasicState.IDLE.equals(supervisorState);
+ }
+
protected Deque<ExceptionEvent> getRecentEventsQueue()
{
return recentEventsQueue;
diff --git
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
new file mode 100644
index 0000000000..01bdeb9e7d
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SupervisorStateManagerTest
+{
+ @Test
+ public void testMarkRunFinishedIfSupervisorIsIdle()
+ {
+ SupervisorStateManager supervisorStateManager = new SupervisorStateManager(
+ new SupervisorStateManagerConfig(),
+ false
+ );
+
+ supervisorStateManager.markRunFinished();
+
+ Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING,
supervisorStateManager.getSupervisorState());
+
+
supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE);
+ supervisorStateManager.markRunFinished();
+
+ Assert.assertEquals(SupervisorStateManager.BasicState.IDLE,
supervisorStateManager.getSupervisorState());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]