techdocsmith commented on code in PR #15591:
URL: https://github.com/apache/druid/pull/15591#discussion_r1446526062


##########
docs/api-reference/supervisor-api.md:
##########
@@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 Creates a new supervisor or updates an existing one for the same datasource 
with a new schema and configuration.

Review Comment:
   "the same datasource" reads ambiguously here. Consider:
   ```suggestion
   Creates a new supervisor spec or updates an existing one with new 
configuration and schema information. When updating a supervisor spec, the 
datasource must remain the same as the previous supervisor.
   ```
   or something similar



##########
docs/api-reference/supervisor-api.md:
##########
@@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 Creates a new supervisor or updates an existing one for the same datasource 
with a new schema and configuration.
 
-You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec)
 or [Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.
+You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or 
[Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.

Review Comment:
   Do we need to mention about the supervisor persisting in the metadata 
database? How does this affect my behavior?
   
   It has some potential for confusion because there is a record in the 
metadata store, but there is also a process(task?) running on the file system 
to periodically launch ingestion tasks.



##########
docs/api-reference/supervisor-api.md:
##########
@@ -3063,10 +3245,16 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 ### Reset a supervisor
 
-Resets the specified supervisor. This endpoint clears _all_ stored offsets in 
Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data 
reading. The supervisor will start from the earliest or latest available 
position, depending on the platform (offsets in Kafka or sequence numbers in 
Kinesis). It kills and recreates active tasks to read from valid positions.
+The supervisor must be running for this endpoint to be available.
+
+Resets the specified supervisor. This endpoint clears all stored offsets in 
Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data 
reading. The supervisor will start from the earliest or latest available 
position, depending on the platform (offsets in Kafka or sequence numbers in 
Kinesis). 
+After clearing all stored offsets in Kafka or sequence numbers in Kinesis, the 
supervisor kills and recreates active tasks,
+so that tasks begin reading from valid positions.
 
 Use this endpoint to recover from a stopped state due to missing offsets in 
Kafka or sequence numbers in Kinesis. Use this endpoint with caution as it may 
result in skipped messages and lead to data loss or duplicate data.
 
+The indexing service keeps track of the latest persisted offsets in Kafka or 
sequence numbers in Kinesis to provide exactly-once ingestion guarantees across 
tasks. Subsequent tasks must start reading from where the previous task 
completed for the generated segments to be accepted. If the messages at the 
expected starting offsets in Kafka or sequence numbers in Kinesis are no longer 
available (typically because the message retention period has elapsed or the 
topic was removed and re-created) the supervisor will refuse to start and in 
flight tasks will fail. This endpoint enables you to recover from this 
condition.

Review Comment:
   ```suggestion
   The indexing service keeps track of the latest persisted offsets in Kafka or 
sequence numbers in Kinesis to provide exactly-once ingestion guarantees across 
tasks. Subsequent tasks must start reading from where the previous task 
completed for Druid to accept the generated segments. If the messages at the 
expected starting offsets in Kafka or sequence numbers in Kinesis are no longer 
available, the supervisor refuses to start and in-flight tasks will fail. 
Possible causes for missing messages include the message retention period 
elapsing or the topic being removed and re-created. Use the `reset` endpoint to 
recover from this condition.
   ```



##########
docs/api-reference/supervisor-api.md:
##########
@@ -3180,8 +3374,8 @@ The following table defines the fields within the 
`partitions` object in the res
 
 #### Sample request
 
-The following example shows how to reset offsets for a kafka supervisor with 
the name `social_media`. Let's say the supervisor is reading
-from a kafka topic `ads_media_stream` and has the stored offsets: `{"0": 0, 
"1": 10, "2": 20, "3": 40}`.
+The following example shows how to reset offsets for a Kafka supervisor with 
the name `social_media`. Let's say the supervisor is reading

Review Comment:
   ```suggestion
   The following example shows how to reset offsets for a Kafka supervisor with 
the name `social_media`. For example, the supervisor is reading
   ```



##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -130,172 +127,244 @@ The following example demonstrates a supervisor spec 
for Kafka that uses the `JS
 }
 ```
 
-### Kafka input format supervisor spec example
-
-If you want to parse the Kafka metadata fields in addition to the Kafka 
payload value contents, you can use the `kafka` input format.
-
-The `kafka` input format wraps around the payload parsing input format and 
augments the data it outputs with the Kafka event timestamp,
-the Kafka topic name, the Kafka event headers, and the key field that itself 
can be parsed using any available InputFormat.
-
-For example, consider the following structure for a Kafka message that 
represents a fictitious wiki edit in a development environment:
-
-- **Kafka timestamp**: `1680795276351`
-- **Kafka topic**: `wiki-edits`
-- **Kafka headers**:
-  - `env=development`
-  - `zone=z1`
-- **Kafka key**: `wiki-edit`
-- **Kafka payload value**: 
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo 
Toraut","delta":31,"namespace":"Main"}`
-
-Using `{ "type": "json" }` as the input format would only parse the payload 
value.
-To parse the Kafka metadata in addition to the payload, use the `kafka` input 
format.
-
-You would configure it as follows:
-
-- `valueFormat`: Define how to parse the payload value. Set this to the 
payload parsing input format (`{ "type": "json" }`).
-- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the 
Druid schema to avoid conflicts with columns from the payload. The default is 
`kafka.timestamp`.
-- `topicColumnName`: Supply a custom name for the Kafka topic in the Druid 
schema to avoid conflicts with columns from the payload. The default is 
`kafka.topic`. This field is useful when ingesting data from multiple topics 
into same datasource.
-- `headerFormat`: The default value `string` decodes strings in UTF-8 encoding 
from the Kafka header.
-   Other supported encoding formats include the following:
-   - `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1.
-   - `US-ASCII`: Seven-bit ASCII. Also known as ISO646-US. The Basic Latin 
block of the Unicode character set.
-   - `UTF-16`: Sixteen-bit UCS Transformation Format, byte order identified by 
an optional byte-order mark.
-   - `UTF-16BE`: Sixteen-bit UCS Transformation Format, big-endian byte order.
-   - `UTF-16LE`: Sixteen-bit UCS Transformation Format, little-endian byte 
order.
-- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any 
conflicts with columns from the payload. The default is `kafka.header.`.
-  Considering the header from the example, Druid maps the headers to the 
following columns: `kafka.header.env`, `kafka.header.zone`.
-- `keyFormat`: Supply an input format to parse the key. Only the first value 
will be used.
-  If, as in the example, your key values are simple strings, then you can use 
the `tsv` format to parse them.
-  ```
-  {
-    "type": "tsv",
-    "findColumnsFromHeader": false,
-    "columns": ["x"]
-  } 
-  ```
-  Note that for `tsv`,`csv`, and `regex` formats, you need to provide a 
`columns` array to make a valid input format. Only the first one is used, and 
its name will be ignored in favor of `keyColumnName`.
-- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts 
with columns from the payload. The default is `kafka.key`.
+</details>
 
-Putting it together, the following input format (that uses the default values 
for `timestampColumnName`, `topicColumnName`, `headerColumnPrefix`, and 
`keyColumnName`)
+### I/O configuration
 
-```json
-{
-  "type": "kafka",
-  "valueFormat": {
-    "type": "json"
-  },
-  "headerFormat": {
-    "type": "string"
-  },
-  "keyFormat": {
-    "type": "tsv",
-    "findColumnsFromHeader": false,
-    "columns": ["x"]
-  }
-}
-```
+The following table outlines the configuration options for `ioConfig`:
 
-would parse the example message as follows:
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`topic`|String|The Kafka topic to read from. Must be a specific topic. Druid 
does not support topic patterns. To ingest data from multiple topic, see 
[Ingest from multiple topics](#ingest-from-multiple-topics). |Yes||
+|`inputFormat`|Object|The [input 
format](../../ingestion/data-formats.md#input-format) to define input data 
parsing.|Yes||
+|`consumerProperties`|String, Object|A map of properties to pass to the Kafka 
consumer. See [Consumer properties](#consumer-properties) for details.|Yes||
+|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll 
records, in milliseconds.|No|100|
+|`replicas`|Integer|The number of replica sets, where 1 is a single set of 
tasks (no replication). Druid always assigns replicate tasks to different 
workers to provide resiliency against process failure.|No|1|
+|`taskCount`|Integer|The maximum number of reading tasks in a replica set. The 
maximum number of reading tasks equals `taskCount * replicas`. The total number 
of tasks, reading and publishing, is greater than this count. See [Capacity 
planning](../../ingestion/supervisor.md#capacity-planning) for more details. 
When `taskCount > {numKafkaPartitions}`, the actual number of reading tasks is 
less than the `taskCount` value.|No|1|
+|`taskDuration`|ISO 8601 period|The length of time before tasks stop reading 
and begin publishing segments.|No|PT1H|
+|`startDelay`|ISO 8601 period|The period to wait before the supervisor starts 
managing tasks.|No|PT5S|
+|`period`|ISO 8601 period|Determines how often the supervisor executes its 
management logic. Note that the supervisor also runs in response to certain 
events, such as tasks succeeding, failing, and reaching their task duration. 
The `period` value specifies the maximum time between iterations.|No|PT30S|
+|`useEarliestOffset`|Boolean|If a supervisor manages a datasource for the 
first time, it obtains a set of starting offsets from Kafka. This flag 
determines whether it retrieves the earliest or latest offsets in Kafka. Under 
normal circumstances, subsequent tasks start from where the previous segments 
ended. Druid only uses `useEarliestOffset` on the first run.|No|`false`|
+|`completionTimeout`|ISO 8601 period|The length of time to wait before 
declaring a publishing task as failed and terminating it. If the value is too 
low, your tasks may never publish. The publishing clock for a task begins 
roughly after `taskDuration` elapses.|No|PT30M|
+|`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to 
reject messages with timestamps earlier than this date time. For example, if 
this property is set to `2016-01-01T11:00Z` and the supervisor creates a task 
at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than 
`2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments, such as a realtime and a nightly batch ingestion pipeline.|No||
+|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps earlier than this period before the task was created. 
For example, if this property is set to `PT1H` and the supervisor creates a 
task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than 
`2016-01-01T11:00Z`. This may help prevent concurrency issues if your data 
stream has late messages and you have multiple pipelines that need to operate 
on the same segments, such as a realtime and a nightly batch ingestion 
pipeline. Note that you can specify only one of the late message rejection 
properties.|No||
+|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps later than this period after the task reached its task 
duration. For example, if this property is set to `PT1H`, the task duration is 
set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid 
drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes 
run past their task duration, such as in cases of supervisor failover. Setting 
`earlyMessageRejectionPeriod` too low may cause Druid to drop messages 
unexpectedly whenever a task runs past its originally configured task 
duration.|No||
+|`autoScalerConfig`|Object|Defines auto scaling behavior for ingestion tasks. 
See [Task autoscaler](../../ingestion/supervisor.md#task-autoscaler) for more 
information.|No|null|
+|`idleConfig`|Object|Defines how and when the Kafka supervisor can become 
idle. See [Idle supervisor configuration](#idle-supervisor-configuration) for 
more details.|No|null|
 
-```json
-{
-  "channel": "#sv.wikipedia",
-  "timestamp": "2016-06-27T00:00:11.080Z",
-  "page": "Salo Toraut",
-  "delta": 31,
-  "namespace": "Main",
-  "kafka.timestamp": 1680795276351,
-  "kafka.topic": "wiki-edits",
-  "kafka.header.env": "development",
-  "kafka.header.zone": "z1",
-  "kafka.key": "wiki-edit"
-}
-```
+#### Consumer properties
 
-For more information on data formats, see [Data 
formats](../../ingestion/data-formats.md).
+Consumer properties must contain a property `bootstrap.servers` with a list of 
Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`.
+By default, `isolation.level` is set to `read_committed`. If you use older 
versions of Kafka servers without transactions support or don't want Druid to 
consume only committed transactions, set `isolation.level` to 
`read_uncommitted`.
 
-Finally, add these Kafka metadata columns to the `dimensionsSpec` or  set your 
`dimensionsSpec` to auto-detect columns.
-     
-The following supervisor spec demonstrates how to ingest the Kafka header, 
key, timestamp, and topic into Druid dimensions:
+In some cases, you may need to fetch consumer properties at runtime. For 
example, when `bootstrap.servers` is not known upfront, or is not static. To 
enable SSL connections, you must provide passwords for `keystore`, 
`truststore`, and `key` secretly. You can provide configurations at runtime 
with a dynamic config provider implementation like the environment variable 
config provider that comes with Druid. For more information, see [Dynamic 
config provider](../../operations/dynamic-config-provider.md).
+
+For example, if you are using SASL and SSL with Kafka, set the following 
environment variables for the Druid user on the machines running the Overlord 
and the Peon services:
 
 ```
-{
-  "type": "kafka",
-  "spec": {
-    "ioConfig": {
-      "type": "kafka",
-      "consumerProperties": {
-        "bootstrap.servers": "localhost:9092"
-      },
-      "topic": "wiki-edits",
-      "inputFormat": {
-        "type": "kafka",
-        "valueFormat": {
-          "type": "json"
-        },
-        "headerFormat": {
-          "type": "string"
-        },
-        "keyFormat": {
-          "type": "tsv",
-          "findColumnsFromHeader": false,
-          "columns": ["x"]
-        }
-      },
-      "useEarliestOffset": true
-    },
-    "dataSchema": {
-      "dataSource": "wikiticker",
-      "timestampSpec": {
-        "column": "timestamp",
-        "format": "posix"
-      },
-      "dimensionsSpec":  "dimensionsSpec": {
-        "useSchemaDiscovery": true,
-        "includeAllDimensions": true
-      },
-      "granularitySpec": {
-        "queryGranularity": "none",
-        "rollup": false,
-        "segmentGranularity": "day"
+export 
KAFKA_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule 
required username='admin_user' password='admin_password';"
+export SSL_KEY_PASSWORD=mysecretkeypassword
+export SSL_KEYSTORE_PASSWORD=mysecretkeystorepassword
+export SSL_TRUSTSTORE_PASSWORD=mysecrettruststorepassword
+```
+
+```json
+  "druid.dynamic.config.provider": {
+    "type": "environment",
+     "variables": {
+        "sasl.jaas.config": "KAFKA_JAAS_CONFIG",
+        "ssl.key.password": "SSL_KEY_PASSWORD",
+        "ssl.keystore.password": "SSL_KEYSTORE_PASSWORD",
+        "ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD"
       }
-    },
-    "tuningConfig": {
-      "type": "kafka"
-    }
   }
-}
 ```
 
-After Druid ingests the data, you can query the Kafka metadata columns as 
follows:
+Verify that you've changed the values for all configurations to match your own 
environment. In the Druid data loader interface, you can use the environment 
variable config provider syntax in the **Consumer properties** field on the 
**Connect tab**. When connecting to Kafka, Druid replaces the environment 
variables with their corresponding values.
 
-```sql
-SELECT
-  "kafka.header.env",
-  "kafka.key",
-  "kafka.timestamp",
-  "kafka.topic"
-FROM "wikiticker"
-```
+#### Task autoscaler
 
-This query returns:
+You can optionally configure autoscaling behavior for ingestion tasks using 
the `autoScalerConfig` property of the `ioConfig` object.
 
-| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | `kafka.topic` |
-|--------------------|-----------|---------------|---------------|
-| `development`      | `wiki-edit` | `1680795276351` | `wiki-edits`  |
+The following table outlines the configuration options for `autoScalerConfig`:
 
-For more information, see [`kafka` data 
format](../../ingestion/data-formats.md#kafka).
+|Property|Description|Required|Default|
+|--------|-----------|--------|-------|
+|`enableTaskAutoScaler`|Enables the auto scaler. If not specified, Druid 
disables the auto scaler even when `autoScalerConfig` is not null.|No|`false`|
+|`taskCountMax`|Maximum number of ingestion tasks. Set `taskCountMax >= 
taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales 
reading tasks up to `{numKafkaPartitions}`. In this case, `taskCountMax` is 
ignored.|Yes||
+|`taskCountMin`|Minimum number of ingestion tasks. When you enable the 
autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts 
with the `taskCountMin` number of tasks to launch.|Yes||
+|`minTriggerScaleActionFrequencyMillis`|Minimum time interval between two 
scale actions.| No|600000|
+|`autoScalerStrategy`|The algorithm of `autoScaler`. Druid only supports the 
`lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more 
information.|No|`lagBased`|
 
-## Submit a supervisor spec
+##### Autoscaler strategy
 
-Druid starts a supervisor for a dataSource when you submit a supervisor spec. 
You can use the data loader in the web console or you can submit a supervisor 
spec to the following endpoint:
+The following table outlines the configuration options for 
`autoScalerStrategy`:
 
-`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor`
+|Property|Description|Required|Default|
+|--------|-----------|--------|-------|
+|`lagCollectionIntervalMillis`|The time period during which Druid collects lag 
metric points.|No|30000|
+|`lagCollectionRangeMillis`|The total time window of lag collection. Use with 
`lagCollectionIntervalMillis` to specify the intervals at which to collect lag 
metric points.|No|600000|
+|`scaleOutThreshold`|The threshold of scale out action. |No|6000000|
+|`triggerScaleOutFractionThreshold`|Enables scale out action if 
`triggerScaleOutFractionThreshold` percent of lag points is higher than 
`scaleOutThreshold`.|No|0.3|
+|`scaleInThreshold`|The threshold of scale in action.|No|1000000|
+|`triggerScaleInFractionThreshold`|Enables scale in action if 
`triggerScaleInFractionThreshold` percent of lag points is lower than 
`scaleOutThreshold`.|No|0.9|
+|`scaleActionStartDelayMillis`|The number of milliseconds to delay after the 
supervisor starts before the first scale logic check.|No|300000|
+|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale 
action is triggered.|No|60000|
+|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1|
+|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2|
 
-For example: 
+#### Idle supervisor configuration
 
-```
-curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json 
http://localhost:8090/druid/indexer/v1/supervisor
-```
+:::info
+Idle state transitioning is currently designated as experimental.
+:::
+
+When the supervisor enters the idle state, no new tasks are launched 
subsequent to the completion of the currently executing tasks. This strategy 
may lead to reduced costs for cluster operators while using topics that get 
sporadic data.
+
+The following table outlines the configuration options for `idleConfig`:
+
+|Property|Description|Required|
+|--------|-----------|--------|
+|`enabled`|If `true`, the supervisor becomes idle if there is no data on input 
stream or topic for some time.|No|`false`|
+|`inactiveAfterMillis`|The supervisor becomes idle if all existing data has 
been read from input topic and no new data has been published for 
`inactiveAfterMillis` milliseconds.|No|`600_000`|
+
+The following example shows a supervisor spec with `lagBased` autoscaler and 
idle configuration enabled:
+
+<details>
+  <summary>Click to view the example</summary>
 
-Where the file `supervisor-spec.json` contains your Kafka supervisor spec file.
+```json
+{
+    "type": "kafka",
+    "spec": {
+      "dataSchema": {
+        ...
+      },
+      "ioConfig": {
+         "topic": "metrics",
+         "inputFormat": {
+             "type": "json"
+         },
+          "consumerProperties": {
+             "bootstrap.servers": "localhost:9092"
+          },
+         "autoScalerConfig": {
+             "enableTaskAutoScaler": true,
+              "taskCountMax": 6,
+              "taskCountMin": 2,
+             "minTriggerScaleActionFrequencyMillis": 600000,
+             "autoScalerStrategy": "lagBased",
+              "lagCollectionIntervalMillis": 30000,
+              "lagCollectionRangeMillis": 600000,
+              "scaleOutThreshold": 6000000,
+              "triggerScaleOutFractionThreshold": 0.3,
+             "scaleInThreshold": 1000000,
+             "triggerScaleInFractionThreshold": 0.9,
+              "scaleActionStartDelayMillis": 300000,
+              "scaleActionPeriodMillis": 60000,
+              "scaleInStep": 1,
+             "scaleOutStep": 2
+         },
+         "taskCount":1,
+         "replicas":1,
+         "taskDuration":"PT1H",
+         "idleConfig": {
+           "enabled": true,
+           "inactiveAfterMillis": 600000 
+         }
+      },
+     "tuningConfig":{
+        ...
+     }
+    }
+}
+```
+</details>
+
+#### Ingest from multiple topics
+
+:::info
+If you enable multi-topic ingestion for a datasource, downgrading to a version 
older than
+28.0.0 will cause the ingestion for that datasource to fail.
+:::
+
+To ingest data from multiple topics, you set `topicPattern` instead of `topic 
in the supervisor `ioConfig` object`.
+You can pass multiple topics as a regex pattern as the value for 
`topicPattern` in `ioConfig`. For example, to
+ingest data from clicks and impressions, set `topicPattern` to 
`clicks|impressions` in `ioCofig`.
+Similarly, you can use `metrics-.*` as the value for `topicPattern` if you 
want to ingest from all the topics that
+start with `metrics-`. If new topics are added to the cluster that match the 
regex, Druid automatically starts
+ingesting from those new topics. A topic name that only matches partially such 
as `my-metrics-12` will not be

Review Comment:
   ```suggestion
   ingesting from those new topics. Topic names that match partially, such as 
`my-metrics-12`, will not be
   ```



##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -719,9 +406,16 @@ If resharding occurs when the supervisor is suspended and 
`useEarliestSequence`
 
 ## Kinesis known issues
 
-Before you deploy the Kinesis extension to production, consider the following 
known issues:
+Before you deploy the `druid-kinesis-indexing-service` extension to 
production, consider the following known issues:

Review Comment:
   Might be worth checking with druid devs to see if any of these have been 
addressed.



##########
docs/api-reference/supervisor-api.md:
##########
@@ -2218,6 +2399,7 @@ Content-Length: 1359
 ### Suspend a running supervisor
 
 Suspends a single running supervisor. Returns the updated supervisor spec, 
where the `suspended` property is set to `true`. The suspended supervisor 
continues to emit logs and metrics.
+Indexing tasks remain suspended until the supervisor is resumed.

Review Comment:
   ```suggestion
   Indexing tasks remain suspended until the supervisor resumes.
   ```
   Consider active. May need to specify that you have to call the resume 
endpoint.



##########
docs/api-reference/supervisor-api.md:
##########
@@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 Creates a new supervisor or updates an existing one for the same datasource 
with a new schema and configuration.
 
-You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec)
 or [Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.
+You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or 
[Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.
+
+The following table lists the properties of a supervisor spec:
+
+|Property|Type|Description|Required|
+|--------|----|-----------|--------|
+|`type`|String|The supervisor type. Choose from `kafka` or `kinesis`.|Yes|

Review Comment:
   ```suggestion
   |`type`|String|The supervisor type. One of `kafka` or `kinesis`.|Yes|
   ```
   I think "One of" is the convention for a list?



##########
docs/api-reference/supervisor-api.md:
##########
@@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 Creates a new supervisor or updates an existing one for the same datasource 
with a new schema and configuration.
 
-You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec)
 or [Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.
+You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or 
[Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.
+
+The following table lists the properties of a supervisor spec:
+
+|Property|Type|Description|Required|
+|--------|----|-----------|--------|
+|`type`|String|The supervisor type. Choose from `kafka` or `kinesis`.|Yes|
+|`spec`|Object|The container object for the supervisor configuration.|Yes|
+|`ioConfig`|Object|The I/O configuration object to define the connection and 
I/O-related settings for the supervisor and indexing task.|Yes|
+|`dataSchema`|Object|The schema for the indexing task to use during ingestion. 
See [`dataSchema`](../ingestion/ingestion-spec.md#dataschema) for more 
information.|Yes|
+|`tuningConfig`|Object|The tuning configuration object to define 
performance-related settings for the supervisor and indexing tasks.|No|
 
 When you call this endpoint on an existing supervisor for the same datasource, 
the running supervisor signals its tasks to stop reading and begin publishing, 
exiting itself. Druid then uses the provided configuration from the request 
body to create a new supervisor. Druid submits a new schema while retaining 
existing publishing tasks and starts new tasks at the previous task offsets.

Review Comment:
   See comment line 2216. If you clarify there, I think you can remove "for the 
same datasource" here, which reads awkwardly.



##########
docs/api-reference/supervisor-api.md:
##########
@@ -3128,13 +3316,19 @@ Host: http://ROUTER_IP:ROUTER_PORT
   ```
 </details>
 
-### Reset Offsets for a supervisor
+### Reset offsets for a supervisor
+
+The supervisor must be running for this endpoint to be available.
+
+Resets the specified offsets for partitions without resetting the entire set.
+
+This endpoint clears only the specified offsets in Kafka or sequence numbers 
in Kinesis, prompting the supervisor to resume reading data from the specified 
offsets.
+If there are no stored offsets, the specified offsets are set in the metadata 
store.
 
-Resets the specified offsets for a supervisor. This endpoint clears _only_ the 
specified offsets in Kafka or sequence numbers in Kinesis, prompting the 
supervisor to resume data reading.
-If there are no stored offsets, the specified offsets will be set in the 
metadata store. The supervisor will start from the reset offsets for the 
partitions specified and for the other partitions from the stored offset.
-It kills and recreates active tasks pertaining to the partitions specified to 
read from valid offsets.
+After resetting stored offsets, the supervisor kills and recreates any active 
tasks pertaining to the specified partitions,
+so that tasks begin reading specified offsets. For partitions that are not 
specified in this operation, the supervisor resumes from the last stored offset.
 
-Use this endpoint to selectively reset offsets for partitions without 
resetting the entire set.
+Use this endpoint with caution as it may result in skipped messages, leading 
to data loss or duplicate data.

Review Comment:
   ```suggestion
   Use this endpoint with caution. It can cause skipped messages, leading to 
data loss or duplicate data.
   ```



##########
docs/api-reference/supervisor-api.md:
##########
@@ -3063,10 +3245,16 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 ### Reset a supervisor
 
-Resets the specified supervisor. This endpoint clears _all_ stored offsets in 
Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data 
reading. The supervisor will start from the earliest or latest available 
position, depending on the platform (offsets in Kafka or sequence numbers in 
Kinesis). It kills and recreates active tasks to read from valid positions.
+The supervisor must be running for this endpoint to be available.
+
+Resets the specified supervisor. This endpoint clears all stored offsets in 
Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data 
reading. The supervisor will start from the earliest or latest available 
position, depending on the platform (offsets in Kafka or sequence numbers in 
Kinesis). 

Review Comment:
   ```suggestion
   Resets the specified supervisor. This endpoint clears all stored offsets in 
Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data 
reading. The supervisor restarts from the earliest or latest available 
position, depending on the platform: offsets in Kafka or sequence numbers in 
Kinesis. 
   ```
   avoid future. avoid parens.



##########
docs/api-reference/supervisor-api.md:
##########
@@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 Creates a new supervisor or updates an existing one for the same datasource 
with a new schema and configuration.
 
-You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec)
 or [Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.
+You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or 
[Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.
+
+The following table lists the properties of a supervisor spec:
+
+|Property|Type|Description|Required|
+|--------|----|-----------|--------|
+|`type`|String|The supervisor type. Choose from `kafka` or `kinesis`.|Yes|
+|`spec`|Object|The container object for the supervisor configuration.|Yes|
+|`ioConfig`|Object|The I/O configuration object to define the connection and 
I/O-related settings for the supervisor and indexing task.|Yes|

Review Comment:
   link to I/O config?



##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -26,45 +26,42 @@ description: "Overview of the Kafka indexing service for 
Druid. Includes example
 
 When you enable the Kafka indexing service, you can configure supervisors on 
the Overlord to manage the creation and lifetime of Kafka indexing tasks.
 
-Kafka indexing tasks read events using Kafka's own partition and offset 
mechanism to guarantee exactly-once ingestion. The supervisor oversees the 
state of the indexing tasks to:
-  - coordinate handoffs
-  - manage failures
-  - ensure that scalability and replication requirements are maintained.
+Kafka indexing tasks read events using Kafka's own partition and offset 
mechanism to guarantee exactly-once ingestion. The supervisor oversees the 
state of the indexing tasks to coordinate handoffs, manage failures, and ensure 
that scalability and replication requirements are maintained.
 
+This topic contains configuration reference information for the Kafka indexing 
service supervisor for Apache Druid.
 
-This topic covers how to submit a supervisor spec to ingest event data, also 
known as message data, from Kafka. See the following for more information:
-- For a reference of Kafka supervisor spec configuration options, see the 
[Kafka supervisor reference](./kafka-supervisor-reference.md).
-- For operations reference information to help run and maintain Apache Kafka 
supervisors, see [Kafka supervisor 
operations](./kafka-supervisor-operations.md).
-- For a walk-through, see the [Loading from Apache 
Kafka](../../tutorials/tutorial-kafka.md) tutorial.
+## Setup
 
-## Kafka support
+To use the Kafka indexing service, you must first load the 
`druid-kafka-indexing-service` extension on both the Overlord and the 
MiddleManager. See [Loading extensions](../../configuration/extensions.md) for 
more information.
+
+### Kafka support
 
 The Kafka indexing service supports transactional topics introduced in Kafka 
0.11.x by default. The consumer for Kafka indexing service is incompatible with 
older Kafka brokers. If you are using an older version, refer to the [Kafka 
upgrade guide](https://kafka.apache.org/documentation/#upgrade).
 
 Additionally, you can set `isolation.level` to `read_uncommitted` in 
`consumerProperties` if either:
 - You don't need Druid to consume transactional topics.
-- You need Druid to consume older versions of Kafka. Make sure offsets are 
sequential, since there is no offset gap check in Druid anymore.
-
-If your Kafka cluster enables consumer-group based ACLs, you can set 
`group.id` in `consumerProperties` to override the default auto generated group 
id.
+- You need Druid to consume older versions of Kafka. Make sure offsets are 
sequential, since there is no offset gap check in Druid.
 
-## Load the Kafka indexing service
+If your Kafka cluster enables consumer-group based ACLs, you can set 
`group.id` in `consumerProperties` to override the default auto generated group 
ID.
 
-To use the Kafka indexing service, load the `druid-kafka-indexing-service` 
extension on both the Overlord and the MiddleManagers. See [Loading 
extensions](../../configuration/extensions.md) for instructions on how to 
configure extensions.
+## Supervisor spec
 
-## Define a supervisor spec
+Similar to the ingestion spec for batch ingestion, the [supervisor 
spec](../../ingestion/supervisor.md#supervisor-spec) configures the data 
ingestion for Kafka streaming ingestion.
 
-Similar to the ingestion spec for batch ingestion, the supervisor spec 
configures the data ingestion for Kafka streaming ingestion. A supervisor spec 
has the following sections:
-- `dataSchema` to specify the Druid datasource name, primary timestamp, 
dimensions, metrics, transforms, and any necessary filters.
-- `ioConfig` to configure Kafka connection settings and configure how Druid 
parses the data. Kafka-specific connection details go in the 
`consumerProperties`. The `ioConfig` is also where you define the input format 
(`inputFormat`) of your Kafka data. For supported formats for Kafka and 
information on how to configure the input format, see [Data 
formats](../../ingestion/data-formats.md).  
-- `tuningConfig` to control various tuning parameters specific to each 
ingestion method.
-For a full description of all the fields and parameters in a Kafka supervisor 
spec, see the [Kafka supervisor reference](./kafka-supervisor-reference.md).
+The following table outlines the high-level configuration options for the 
Kafka supervisor spec:
 
+|Property|Type|Description|Required|
+|--------|----|-----------|--------|
+|`type`|String|The supervisor type; this should always be `kafka`.|Yes|

Review Comment:
   ```suggestion
   |`type`|String|The supervisor type; must be `kafka`.|Yes|
   ```



##########
docs/api-reference/supervisor-api.md:
##########
@@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 Creates a new supervisor or updates an existing one for the same datasource 
with a new schema and configuration.
 
-You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec)
 or [Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.
+You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or 
[Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.
+
+The following table lists the properties of a supervisor spec:
+
+|Property|Type|Description|Required|
+|--------|----|-----------|--------|
+|`type`|String|The supervisor type. Choose from `kafka` or `kinesis`.|Yes|
+|`spec`|Object|The container object for the supervisor configuration.|Yes|
+|`ioConfig`|Object|The I/O configuration object to define the connection and 
I/O-related settings for the supervisor and indexing task.|Yes|
+|`dataSchema`|Object|The schema for the indexing task to use during ingestion. 
See [`dataSchema`](../ingestion/ingestion-spec.md#dataschema) for more 
information.|Yes|
+|`tuningConfig`|Object|The tuning configuration object to define 
performance-related settings for the supervisor and indexing tasks.|No|
 
 When you call this endpoint on an existing supervisor for the same datasource, 
the running supervisor signals its tasks to stop reading and begin publishing, 
exiting itself. Druid then uses the provided configuration from the request 
body to create a new supervisor. Druid submits a new schema while retaining 
existing publishing tasks and starts new tasks at the previous task offsets.
+In this way, configuration changes can be applied without requiring any pause 
in ingestion.

Review Comment:
   ```suggestion
   This way, you can apply configuration changes without a pause in ingestion.
   ```
   consider active voice



##########
docs/api-reference/supervisor-api.md:
##########
@@ -3216,7 +3410,7 @@ Content-Type: application/json
 }
 ```
 
-The above operation will reset offsets only for partitions 0 and 2 to 100 and 
650 respectively. After a successful reset,
+The above operation will reset offsets only for partitions `0` and `2` to 100 
and 650 respectively. After a successful reset,

Review Comment:
   ```suggestion
   The example operation resets offsets only for partitions `0` and `2` to 100 
and 650 respectively. After a successful reset,
   ```
   Avoid future, avoid location above/below indicators



##########
docs/configuration/index.md:
##########
@@ -1159,7 +1159,7 @@ If autoscaling is enabled, you can set these additional 
configs:
 |`druid.supervisor.idleConfig.enabled`|If `true`, supervisor can become idle 
if there is no data on input stream/topic for some time.|false|
 |`druid.supervisor.idleConfig.inactiveAfterMillis`|Supervisor is marked as 
idle if all existing data has been read from input topic and no new data has 
been published for `inactiveAfterMillis` milliseconds.|`600_000`|
 
-The `druid.supervisor.idleConfig.*` specified in the Overlord runtime 
properties defines the default behavior for the entire cluster. See [Idle 
Configuration in Kafka Supervisor 
IOConfig](../development/extensions-core/kafka-supervisor-reference.md#supervisor-io-configuration)
 to override it for an individual supervisor.
+The `druid.supervisor.idleConfig.*` specified in the Overlord runtime 
properties defines the default behavior for the entire cluster. See [Idle 
Configuration in Kafka Supervisor 
IOConfig](../development/extensions-core/kinesis-ingestion.md#io-configuration) 
to override it for an individual supervisor.

Review Comment:
   ```suggestion
   The `druid.supervisor.idleConfig.*` specification in the Overlord runtime 
properties defines the default behavior for the entire cluster. See [Idle 
Configuration in Kafka Supervisor 
IOConfig](../development/extensions-core/kinesis-ingestion.md#io-configuration) 
to override it for an individual supervisor.
   ```



##########
docs/api-reference/supervisor-api.md:
##########
@@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT
 
 Creates a new supervisor or updates an existing one for the same datasource 
with a new schema and configuration.
 
-You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec)
 or [Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.
+You can define a supervisor spec for [Apache 
Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or 
[Amazon 
Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) 
streaming ingestion methods. Once created, the supervisor persists in the 
metadata database.
+
+The following table lists the properties of a supervisor spec:
+
+|Property|Type|Description|Required|
+|--------|----|-----------|--------|
+|`type`|String|The supervisor type. Choose from `kafka` or `kinesis`.|Yes|
+|`spec`|Object|The container object for the supervisor configuration.|Yes|
+|`ioConfig`|Object|The I/O configuration object to define the connection and 
I/O-related settings for the supervisor and indexing task.|Yes|
+|`dataSchema`|Object|The schema for the indexing task to use during ingestion. 
See [`dataSchema`](../ingestion/ingestion-spec.md#dataschema) for more 
information.|Yes|
+|`tuningConfig`|Object|The tuning configuration object to define 
performance-related settings for the supervisor and indexing tasks.|No|
 
 When you call this endpoint on an existing supervisor for the same datasource, 
the running supervisor signals its tasks to stop reading and begin publishing, 
exiting itself. Druid then uses the provided configuration from the request 
body to create a new supervisor. Druid submits a new schema while retaining 
existing publishing tasks and starts new tasks at the previous task offsets.
+In this way, configuration changes can be applied without requiring any pause 
in ingestion.
+
+You can achieve seamless schema migrations by submitting the new schema using 
the `/druid/indexer/v1/supervisor` endpoint.

Review Comment:
   ```suggestion
   Submit updated schemas for a datasource to the 
`/druid/indexer/v1/supervisor` endpoint to achieve seamless schema migrations.
   ```
   This seems a little repetitive of line 2231. If you keep it, consider active 
voice. "seamless" is setting off an amber flag for me, but not opposed to it.



##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -130,172 +127,244 @@ The following example demonstrates a supervisor spec 
for Kafka that uses the `JS
 }
 ```
 
-### Kafka input format supervisor spec example
-
-If you want to parse the Kafka metadata fields in addition to the Kafka 
payload value contents, you can use the `kafka` input format.
-
-The `kafka` input format wraps around the payload parsing input format and 
augments the data it outputs with the Kafka event timestamp,
-the Kafka topic name, the Kafka event headers, and the key field that itself 
can be parsed using any available InputFormat.
-
-For example, consider the following structure for a Kafka message that 
represents a fictitious wiki edit in a development environment:
-
-- **Kafka timestamp**: `1680795276351`
-- **Kafka topic**: `wiki-edits`
-- **Kafka headers**:
-  - `env=development`
-  - `zone=z1`
-- **Kafka key**: `wiki-edit`
-- **Kafka payload value**: 
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo 
Toraut","delta":31,"namespace":"Main"}`
-
-Using `{ "type": "json" }` as the input format would only parse the payload 
value.
-To parse the Kafka metadata in addition to the payload, use the `kafka` input 
format.
-
-You would configure it as follows:
-
-- `valueFormat`: Define how to parse the payload value. Set this to the 
payload parsing input format (`{ "type": "json" }`).
-- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the 
Druid schema to avoid conflicts with columns from the payload. The default is 
`kafka.timestamp`.
-- `topicColumnName`: Supply a custom name for the Kafka topic in the Druid 
schema to avoid conflicts with columns from the payload. The default is 
`kafka.topic`. This field is useful when ingesting data from multiple topics 
into same datasource.
-- `headerFormat`: The default value `string` decodes strings in UTF-8 encoding 
from the Kafka header.
-   Other supported encoding formats include the following:
-   - `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1.
-   - `US-ASCII`: Seven-bit ASCII. Also known as ISO646-US. The Basic Latin 
block of the Unicode character set.
-   - `UTF-16`: Sixteen-bit UCS Transformation Format, byte order identified by 
an optional byte-order mark.
-   - `UTF-16BE`: Sixteen-bit UCS Transformation Format, big-endian byte order.
-   - `UTF-16LE`: Sixteen-bit UCS Transformation Format, little-endian byte 
order.
-- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any 
conflicts with columns from the payload. The default is `kafka.header.`.
-  Considering the header from the example, Druid maps the headers to the 
following columns: `kafka.header.env`, `kafka.header.zone`.
-- `keyFormat`: Supply an input format to parse the key. Only the first value 
will be used.
-  If, as in the example, your key values are simple strings, then you can use 
the `tsv` format to parse them.
-  ```
-  {
-    "type": "tsv",
-    "findColumnsFromHeader": false,
-    "columns": ["x"]
-  } 
-  ```
-  Note that for `tsv`,`csv`, and `regex` formats, you need to provide a 
`columns` array to make a valid input format. Only the first one is used, and 
its name will be ignored in favor of `keyColumnName`.
-- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts 
with columns from the payload. The default is `kafka.key`.
+</details>
 
-Putting it together, the following input format (that uses the default values 
for `timestampColumnName`, `topicColumnName`, `headerColumnPrefix`, and 
`keyColumnName`)
+### I/O configuration
 
-```json
-{
-  "type": "kafka",
-  "valueFormat": {
-    "type": "json"
-  },
-  "headerFormat": {
-    "type": "string"
-  },
-  "keyFormat": {
-    "type": "tsv",
-    "findColumnsFromHeader": false,
-    "columns": ["x"]
-  }
-}
-```
+The following table outlines the configuration options for `ioConfig`:
 
-would parse the example message as follows:
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`topic`|String|The Kafka topic to read from. Must be a specific topic. Druid 
does not support topic patterns. To ingest data from multiple topic, see 
[Ingest from multiple topics](#ingest-from-multiple-topics). |Yes||
+|`inputFormat`|Object|The [input 
format](../../ingestion/data-formats.md#input-format) to define input data 
parsing.|Yes||
+|`consumerProperties`|String, Object|A map of properties to pass to the Kafka 
consumer. See [Consumer properties](#consumer-properties) for details.|Yes||
+|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll 
records, in milliseconds.|No|100|
+|`replicas`|Integer|The number of replica sets, where 1 is a single set of 
tasks (no replication). Druid always assigns replicate tasks to different 
workers to provide resiliency against process failure.|No|1|
+|`taskCount`|Integer|The maximum number of reading tasks in a replica set. The 
maximum number of reading tasks equals `taskCount * replicas`. The total number 
of tasks, reading and publishing, is greater than this count. See [Capacity 
planning](../../ingestion/supervisor.md#capacity-planning) for more details. 
When `taskCount > {numKafkaPartitions}`, the actual number of reading tasks is 
less than the `taskCount` value.|No|1|
+|`taskDuration`|ISO 8601 period|The length of time before tasks stop reading 
and begin publishing segments.|No|PT1H|
+|`startDelay`|ISO 8601 period|The period to wait before the supervisor starts 
managing tasks.|No|PT5S|
+|`period`|ISO 8601 period|Determines how often the supervisor executes its 
management logic. Note that the supervisor also runs in response to certain 
events, such as tasks succeeding, failing, and reaching their task duration. 
The `period` value specifies the maximum time between iterations.|No|PT30S|
+|`useEarliestOffset`|Boolean|If a supervisor manages a datasource for the 
first time, it obtains a set of starting offsets from Kafka. This flag 
determines whether it retrieves the earliest or latest offsets in Kafka. Under 
normal circumstances, subsequent tasks start from where the previous segments 
ended. Druid only uses `useEarliestOffset` on the first run.|No|`false`|
+|`completionTimeout`|ISO 8601 period|The length of time to wait before 
declaring a publishing task as failed and terminating it. If the value is too 
low, your tasks may never publish. The publishing clock for a task begins 
roughly after `taskDuration` elapses.|No|PT30M|
+|`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to 
reject messages with timestamps earlier than this date time. For example, if 
this property is set to `2016-01-01T11:00Z` and the supervisor creates a task 
at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than 
`2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments, such as a realtime and a nightly batch ingestion pipeline.|No||
+|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps earlier than this period before the task was created. 
For example, if this property is set to `PT1H` and the supervisor creates a 
task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than 
`2016-01-01T11:00Z`. This may help prevent concurrency issues if your data 
stream has late messages and you have multiple pipelines that need to operate 
on the same segments, such as a realtime and a nightly batch ingestion 
pipeline. Note that you can specify only one of the late message rejection 
properties.|No||
+|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps later than this period after the task reached its task 
duration. For example, if this property is set to `PT1H`, the task duration is 
set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid 
drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes 
run past their task duration, such as in cases of supervisor failover. Setting 
`earlyMessageRejectionPeriod` too low may cause Druid to drop messages 
unexpectedly whenever a task runs past its originally configured task 
duration.|No||
+|`autoScalerConfig`|Object|Defines auto scaling behavior for ingestion tasks. 
See [Task autoscaler](../../ingestion/supervisor.md#task-autoscaler) for more 
information.|No|null|
+|`idleConfig`|Object|Defines how and when the Kafka supervisor can become 
idle. See [Idle supervisor configuration](#idle-supervisor-configuration) for 
more details.|No|null|
 
-```json
-{
-  "channel": "#sv.wikipedia",
-  "timestamp": "2016-06-27T00:00:11.080Z",
-  "page": "Salo Toraut",
-  "delta": 31,
-  "namespace": "Main",
-  "kafka.timestamp": 1680795276351,
-  "kafka.topic": "wiki-edits",
-  "kafka.header.env": "development",
-  "kafka.header.zone": "z1",
-  "kafka.key": "wiki-edit"
-}
-```
+#### Consumer properties
 
-For more information on data formats, see [Data 
formats](../../ingestion/data-formats.md).
+Consumer properties must contain a property `bootstrap.servers` with a list of 
Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`.
+By default, `isolation.level` is set to `read_committed`. If you use older 
versions of Kafka servers without transactions support or don't want Druid to 
consume only committed transactions, set `isolation.level` to 
`read_uncommitted`.
 
-Finally, add these Kafka metadata columns to the `dimensionsSpec` or  set your 
`dimensionsSpec` to auto-detect columns.
-     
-The following supervisor spec demonstrates how to ingest the Kafka header, 
key, timestamp, and topic into Druid dimensions:
+In some cases, you may need to fetch consumer properties at runtime. For 
example, when `bootstrap.servers` is not known upfront, or is not static. To 
enable SSL connections, you must provide passwords for `keystore`, 
`truststore`, and `key` secretly. You can provide configurations at runtime 
with a dynamic config provider implementation like the environment variable 
config provider that comes with Druid. For more information, see [Dynamic 
config provider](../../operations/dynamic-config-provider.md).
+
+For example, if you are using SASL and SSL with Kafka, set the following 
environment variables for the Druid user on the machines running the Overlord 
and the Peon services:
 
 ```
-{
-  "type": "kafka",
-  "spec": {
-    "ioConfig": {
-      "type": "kafka",
-      "consumerProperties": {
-        "bootstrap.servers": "localhost:9092"
-      },
-      "topic": "wiki-edits",
-      "inputFormat": {
-        "type": "kafka",
-        "valueFormat": {
-          "type": "json"
-        },
-        "headerFormat": {
-          "type": "string"
-        },
-        "keyFormat": {
-          "type": "tsv",
-          "findColumnsFromHeader": false,
-          "columns": ["x"]
-        }
-      },
-      "useEarliestOffset": true
-    },
-    "dataSchema": {
-      "dataSource": "wikiticker",
-      "timestampSpec": {
-        "column": "timestamp",
-        "format": "posix"
-      },
-      "dimensionsSpec":  "dimensionsSpec": {
-        "useSchemaDiscovery": true,
-        "includeAllDimensions": true
-      },
-      "granularitySpec": {
-        "queryGranularity": "none",
-        "rollup": false,
-        "segmentGranularity": "day"
+export 
KAFKA_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule 
required username='admin_user' password='admin_password';"
+export SSL_KEY_PASSWORD=mysecretkeypassword
+export SSL_KEYSTORE_PASSWORD=mysecretkeystorepassword
+export SSL_TRUSTSTORE_PASSWORD=mysecrettruststorepassword
+```
+
+```json
+  "druid.dynamic.config.provider": {
+    "type": "environment",
+     "variables": {
+        "sasl.jaas.config": "KAFKA_JAAS_CONFIG",
+        "ssl.key.password": "SSL_KEY_PASSWORD",
+        "ssl.keystore.password": "SSL_KEYSTORE_PASSWORD",
+        "ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD"
       }
-    },
-    "tuningConfig": {
-      "type": "kafka"
-    }
   }
-}
 ```
 
-After Druid ingests the data, you can query the Kafka metadata columns as 
follows:
+Verify that you've changed the values for all configurations to match your own 
environment. In the Druid data loader interface, you can use the environment 
variable config provider syntax in the **Consumer properties** field on the 
**Connect tab**. When connecting to Kafka, Druid replaces the environment 
variables with their corresponding values.
 
-```sql
-SELECT
-  "kafka.header.env",
-  "kafka.key",
-  "kafka.timestamp",
-  "kafka.topic"
-FROM "wikiticker"
-```
+#### Task autoscaler
 
-This query returns:
+You can optionally configure autoscaling behavior for ingestion tasks using 
the `autoScalerConfig` property of the `ioConfig` object.
 
-| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | `kafka.topic` |
-|--------------------|-----------|---------------|---------------|
-| `development`      | `wiki-edit` | `1680795276351` | `wiki-edits`  |
+The following table outlines the configuration options for `autoScalerConfig`:
 
-For more information, see [`kafka` data 
format](../../ingestion/data-formats.md#kafka).
+|Property|Description|Required|Default|
+|--------|-----------|--------|-------|
+|`enableTaskAutoScaler`|Enables the auto scaler. If not specified, Druid 
disables the auto scaler even when `autoScalerConfig` is not null.|No|`false`|
+|`taskCountMax`|Maximum number of ingestion tasks. Set `taskCountMax >= 
taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales 
reading tasks up to `{numKafkaPartitions}`. In this case, `taskCountMax` is 
ignored.|Yes||
+|`taskCountMin`|Minimum number of ingestion tasks. When you enable the 
autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts 
with the `taskCountMin` number of tasks to launch.|Yes||
+|`minTriggerScaleActionFrequencyMillis`|Minimum time interval between two 
scale actions.| No|600000|
+|`autoScalerStrategy`|The algorithm of `autoScaler`. Druid only supports the 
`lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more 
information.|No|`lagBased`|
 
-## Submit a supervisor spec
+##### Autoscaler strategy
 
-Druid starts a supervisor for a dataSource when you submit a supervisor spec. 
You can use the data loader in the web console or you can submit a supervisor 
spec to the following endpoint:
+The following table outlines the configuration options for 
`autoScalerStrategy`:
 
-`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor`
+|Property|Description|Required|Default|
+|--------|-----------|--------|-------|
+|`lagCollectionIntervalMillis`|The time period during which Druid collects lag 
metric points.|No|30000|
+|`lagCollectionRangeMillis`|The total time window of lag collection. Use with 
`lagCollectionIntervalMillis` to specify the intervals at which to collect lag 
metric points.|No|600000|
+|`scaleOutThreshold`|The threshold of scale out action. |No|6000000|
+|`triggerScaleOutFractionThreshold`|Enables scale out action if 
`triggerScaleOutFractionThreshold` percent of lag points is higher than 
`scaleOutThreshold`.|No|0.3|
+|`scaleInThreshold`|The threshold of scale in action.|No|1000000|
+|`triggerScaleInFractionThreshold`|Enables scale in action if 
`triggerScaleInFractionThreshold` percent of lag points is lower than 
`scaleOutThreshold`.|No|0.9|
+|`scaleActionStartDelayMillis`|The number of milliseconds to delay after the 
supervisor starts before the first scale logic check.|No|300000|
+|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale 
action is triggered.|No|60000|
+|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1|
+|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2|
 
-For example: 
+#### Idle supervisor configuration
 
-```
-curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json 
http://localhost:8090/druid/indexer/v1/supervisor
-```
+:::info
+Idle state transitioning is currently designated as experimental.
+:::
+
+When the supervisor enters the idle state, no new tasks are launched 
subsequent to the completion of the currently executing tasks. This strategy 
may lead to reduced costs for cluster operators while using topics that get 
sporadic data.
+
+The following table outlines the configuration options for `idleConfig`:
+
+|Property|Description|Required|
+|--------|-----------|--------|
+|`enabled`|If `true`, the supervisor becomes idle if there is no data on input 
stream or topic for some time.|No|`false`|
+|`inactiveAfterMillis`|The supervisor becomes idle if all existing data has 
been read from input topic and no new data has been published for 
`inactiveAfterMillis` milliseconds.|No|`600_000`|
+
+The following example shows a supervisor spec with `lagBased` autoscaler and 
idle configuration enabled:
+
+<details>
+  <summary>Click to view the example</summary>
 
-Where the file `supervisor-spec.json` contains your Kafka supervisor spec file.
+```json
+{
+    "type": "kafka",
+    "spec": {
+      "dataSchema": {
+        ...
+      },
+      "ioConfig": {
+         "topic": "metrics",
+         "inputFormat": {
+             "type": "json"
+         },
+          "consumerProperties": {
+             "bootstrap.servers": "localhost:9092"
+          },
+         "autoScalerConfig": {
+             "enableTaskAutoScaler": true,
+              "taskCountMax": 6,
+              "taskCountMin": 2,
+             "minTriggerScaleActionFrequencyMillis": 600000,
+             "autoScalerStrategy": "lagBased",
+              "lagCollectionIntervalMillis": 30000,
+              "lagCollectionRangeMillis": 600000,
+              "scaleOutThreshold": 6000000,
+              "triggerScaleOutFractionThreshold": 0.3,
+             "scaleInThreshold": 1000000,
+             "triggerScaleInFractionThreshold": 0.9,
+              "scaleActionStartDelayMillis": 300000,
+              "scaleActionPeriodMillis": 60000,
+              "scaleInStep": 1,
+             "scaleOutStep": 2
+         },
+         "taskCount":1,
+         "replicas":1,
+         "taskDuration":"PT1H",
+         "idleConfig": {
+           "enabled": true,
+           "inactiveAfterMillis": 600000 
+         }
+      },
+     "tuningConfig":{
+        ...
+     }
+    }
+}
+```
+</details>
+
+#### Ingest from multiple topics
+
+:::info
+If you enable multi-topic ingestion for a datasource, downgrading to a version 
older than
+28.0.0 will cause the ingestion for that datasource to fail.
+:::
+
+To ingest data from multiple topics, you set `topicPattern` instead of `topic 
in the supervisor `ioConfig` object`.
+You can pass multiple topics as a regex pattern as the value for 
`topicPattern` in `ioConfig`. For example, to
+ingest data from clicks and impressions, set `topicPattern` to 
`clicks|impressions` in `ioCofig`.
+Similarly, you can use `metrics-.*` as the value for `topicPattern` if you 
want to ingest from all the topics that
+start with `metrics-`. If new topics are added to the cluster that match the 
regex, Druid automatically starts

Review Comment:
   ```suggestion
   start with `metrics-`. If you add a new topic that matches the regex to the 
cluster , Druid automatically starts
   ```



##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -130,172 +127,244 @@ The following example demonstrates a supervisor spec 
for Kafka that uses the `JS
 }
 ```
 
-### Kafka input format supervisor spec example
-
-If you want to parse the Kafka metadata fields in addition to the Kafka 
payload value contents, you can use the `kafka` input format.
-
-The `kafka` input format wraps around the payload parsing input format and 
augments the data it outputs with the Kafka event timestamp,
-the Kafka topic name, the Kafka event headers, and the key field that itself 
can be parsed using any available InputFormat.
-
-For example, consider the following structure for a Kafka message that 
represents a fictitious wiki edit in a development environment:
-
-- **Kafka timestamp**: `1680795276351`
-- **Kafka topic**: `wiki-edits`
-- **Kafka headers**:
-  - `env=development`
-  - `zone=z1`
-- **Kafka key**: `wiki-edit`
-- **Kafka payload value**: 
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo 
Toraut","delta":31,"namespace":"Main"}`
-
-Using `{ "type": "json" }` as the input format would only parse the payload 
value.
-To parse the Kafka metadata in addition to the payload, use the `kafka` input 
format.
-
-You would configure it as follows:
-
-- `valueFormat`: Define how to parse the payload value. Set this to the 
payload parsing input format (`{ "type": "json" }`).
-- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the 
Druid schema to avoid conflicts with columns from the payload. The default is 
`kafka.timestamp`.
-- `topicColumnName`: Supply a custom name for the Kafka topic in the Druid 
schema to avoid conflicts with columns from the payload. The default is 
`kafka.topic`. This field is useful when ingesting data from multiple topics 
into same datasource.
-- `headerFormat`: The default value `string` decodes strings in UTF-8 encoding 
from the Kafka header.
-   Other supported encoding formats include the following:
-   - `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1.
-   - `US-ASCII`: Seven-bit ASCII. Also known as ISO646-US. The Basic Latin 
block of the Unicode character set.
-   - `UTF-16`: Sixteen-bit UCS Transformation Format, byte order identified by 
an optional byte-order mark.
-   - `UTF-16BE`: Sixteen-bit UCS Transformation Format, big-endian byte order.
-   - `UTF-16LE`: Sixteen-bit UCS Transformation Format, little-endian byte 
order.
-- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any 
conflicts with columns from the payload. The default is `kafka.header.`.
-  Considering the header from the example, Druid maps the headers to the 
following columns: `kafka.header.env`, `kafka.header.zone`.
-- `keyFormat`: Supply an input format to parse the key. Only the first value 
will be used.
-  If, as in the example, your key values are simple strings, then you can use 
the `tsv` format to parse them.
-  ```
-  {
-    "type": "tsv",
-    "findColumnsFromHeader": false,
-    "columns": ["x"]
-  } 
-  ```
-  Note that for `tsv`,`csv`, and `regex` formats, you need to provide a 
`columns` array to make a valid input format. Only the first one is used, and 
its name will be ignored in favor of `keyColumnName`.
-- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts 
with columns from the payload. The default is `kafka.key`.
+</details>
 
-Putting it together, the following input format (that uses the default values 
for `timestampColumnName`, `topicColumnName`, `headerColumnPrefix`, and 
`keyColumnName`)
+### I/O configuration
 
-```json
-{
-  "type": "kafka",
-  "valueFormat": {
-    "type": "json"
-  },
-  "headerFormat": {
-    "type": "string"
-  },
-  "keyFormat": {
-    "type": "tsv",
-    "findColumnsFromHeader": false,
-    "columns": ["x"]
-  }
-}
-```
+The following table outlines the configuration options for `ioConfig`:
 
-would parse the example message as follows:
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`topic`|String|The Kafka topic to read from. Must be a specific topic. Druid 
does not support topic patterns. To ingest data from multiple topic, see 
[Ingest from multiple topics](#ingest-from-multiple-topics). |Yes||
+|`inputFormat`|Object|The [input 
format](../../ingestion/data-formats.md#input-format) to define input data 
parsing.|Yes||
+|`consumerProperties`|String, Object|A map of properties to pass to the Kafka 
consumer. See [Consumer properties](#consumer-properties) for details.|Yes||
+|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll 
records, in milliseconds.|No|100|
+|`replicas`|Integer|The number of replica sets, where 1 is a single set of 
tasks (no replication). Druid always assigns replicate tasks to different 
workers to provide resiliency against process failure.|No|1|
+|`taskCount`|Integer|The maximum number of reading tasks in a replica set. The 
maximum number of reading tasks equals `taskCount * replicas`. The total number 
of tasks, reading and publishing, is greater than this count. See [Capacity 
planning](../../ingestion/supervisor.md#capacity-planning) for more details. 
When `taskCount > {numKafkaPartitions}`, the actual number of reading tasks is 
less than the `taskCount` value.|No|1|
+|`taskDuration`|ISO 8601 period|The length of time before tasks stop reading 
and begin publishing segments.|No|PT1H|
+|`startDelay`|ISO 8601 period|The period to wait before the supervisor starts 
managing tasks.|No|PT5S|
+|`period`|ISO 8601 period|Determines how often the supervisor executes its 
management logic. Note that the supervisor also runs in response to certain 
events, such as tasks succeeding, failing, and reaching their task duration. 
The `period` value specifies the maximum time between iterations.|No|PT30S|
+|`useEarliestOffset`|Boolean|If a supervisor manages a datasource for the 
first time, it obtains a set of starting offsets from Kafka. This flag 
determines whether it retrieves the earliest or latest offsets in Kafka. Under 
normal circumstances, subsequent tasks start from where the previous segments 
ended. Druid only uses `useEarliestOffset` on the first run.|No|`false`|
+|`completionTimeout`|ISO 8601 period|The length of time to wait before 
declaring a publishing task as failed and terminating it. If the value is too 
low, your tasks may never publish. The publishing clock for a task begins 
roughly after `taskDuration` elapses.|No|PT30M|
+|`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to 
reject messages with timestamps earlier than this date time. For example, if 
this property is set to `2016-01-01T11:00Z` and the supervisor creates a task 
at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than 
`2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments, such as a realtime and a nightly batch ingestion pipeline.|No||
+|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps earlier than this period before the task was created. 
For example, if this property is set to `PT1H` and the supervisor creates a 
task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than 
`2016-01-01T11:00Z`. This may help prevent concurrency issues if your data 
stream has late messages and you have multiple pipelines that need to operate 
on the same segments, such as a realtime and a nightly batch ingestion 
pipeline. Note that you can specify only one of the late message rejection 
properties.|No||
+|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps later than this period after the task reached its task 
duration. For example, if this property is set to `PT1H`, the task duration is 
set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid 
drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes 
run past their task duration, such as in cases of supervisor failover. Setting 
`earlyMessageRejectionPeriod` too low may cause Druid to drop messages 
unexpectedly whenever a task runs past its originally configured task 
duration.|No||
+|`autoScalerConfig`|Object|Defines auto scaling behavior for ingestion tasks. 
See [Task autoscaler](../../ingestion/supervisor.md#task-autoscaler) for more 
information.|No|null|
+|`idleConfig`|Object|Defines how and when the Kafka supervisor can become 
idle. See [Idle supervisor configuration](#idle-supervisor-configuration) for 
more details.|No|null|
 
-```json
-{
-  "channel": "#sv.wikipedia",
-  "timestamp": "2016-06-27T00:00:11.080Z",
-  "page": "Salo Toraut",
-  "delta": 31,
-  "namespace": "Main",
-  "kafka.timestamp": 1680795276351,
-  "kafka.topic": "wiki-edits",
-  "kafka.header.env": "development",
-  "kafka.header.zone": "z1",
-  "kafka.key": "wiki-edit"
-}
-```
+#### Consumer properties
 
-For more information on data formats, see [Data 
formats](../../ingestion/data-formats.md).
+Consumer properties must contain a property `bootstrap.servers` with a list of 
Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`.
+By default, `isolation.level` is set to `read_committed`. If you use older 
versions of Kafka servers without transactions support or don't want Druid to 
consume only committed transactions, set `isolation.level` to 
`read_uncommitted`.
 
-Finally, add these Kafka metadata columns to the `dimensionsSpec` or  set your 
`dimensionsSpec` to auto-detect columns.
-     
-The following supervisor spec demonstrates how to ingest the Kafka header, 
key, timestamp, and topic into Druid dimensions:
+In some cases, you may need to fetch consumer properties at runtime. For 
example, when `bootstrap.servers` is not known upfront, or is not static. To 
enable SSL connections, you must provide passwords for `keystore`, 
`truststore`, and `key` secretly. You can provide configurations at runtime 
with a dynamic config provider implementation like the environment variable 
config provider that comes with Druid. For more information, see [Dynamic 
config provider](../../operations/dynamic-config-provider.md).
+
+For example, if you are using SASL and SSL with Kafka, set the following 
environment variables for the Druid user on the machines running the Overlord 
and the Peon services:
 
 ```
-{
-  "type": "kafka",
-  "spec": {
-    "ioConfig": {
-      "type": "kafka",
-      "consumerProperties": {
-        "bootstrap.servers": "localhost:9092"
-      },
-      "topic": "wiki-edits",
-      "inputFormat": {
-        "type": "kafka",
-        "valueFormat": {
-          "type": "json"
-        },
-        "headerFormat": {
-          "type": "string"
-        },
-        "keyFormat": {
-          "type": "tsv",
-          "findColumnsFromHeader": false,
-          "columns": ["x"]
-        }
-      },
-      "useEarliestOffset": true
-    },
-    "dataSchema": {
-      "dataSource": "wikiticker",
-      "timestampSpec": {
-        "column": "timestamp",
-        "format": "posix"
-      },
-      "dimensionsSpec":  "dimensionsSpec": {
-        "useSchemaDiscovery": true,
-        "includeAllDimensions": true
-      },
-      "granularitySpec": {
-        "queryGranularity": "none",
-        "rollup": false,
-        "segmentGranularity": "day"
+export 
KAFKA_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule 
required username='admin_user' password='admin_password';"
+export SSL_KEY_PASSWORD=mysecretkeypassword
+export SSL_KEYSTORE_PASSWORD=mysecretkeystorepassword
+export SSL_TRUSTSTORE_PASSWORD=mysecrettruststorepassword
+```
+
+```json
+  "druid.dynamic.config.provider": {
+    "type": "environment",
+     "variables": {
+        "sasl.jaas.config": "KAFKA_JAAS_CONFIG",
+        "ssl.key.password": "SSL_KEY_PASSWORD",
+        "ssl.keystore.password": "SSL_KEYSTORE_PASSWORD",
+        "ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD"
       }
-    },
-    "tuningConfig": {
-      "type": "kafka"
-    }
   }
-}
 ```
 
-After Druid ingests the data, you can query the Kafka metadata columns as 
follows:
+Verify that you've changed the values for all configurations to match your own 
environment. In the Druid data loader interface, you can use the environment 
variable config provider syntax in the **Consumer properties** field on the 
**Connect tab**. When connecting to Kafka, Druid replaces the environment 
variables with their corresponding values.
 
-```sql
-SELECT
-  "kafka.header.env",
-  "kafka.key",
-  "kafka.timestamp",
-  "kafka.topic"
-FROM "wikiticker"
-```
+#### Task autoscaler
 
-This query returns:
+You can optionally configure autoscaling behavior for ingestion tasks using 
the `autoScalerConfig` property of the `ioConfig` object.
 
-| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | `kafka.topic` |
-|--------------------|-----------|---------------|---------------|
-| `development`      | `wiki-edit` | `1680795276351` | `wiki-edits`  |
+The following table outlines the configuration options for `autoScalerConfig`:
 
-For more information, see [`kafka` data 
format](../../ingestion/data-formats.md#kafka).
+|Property|Description|Required|Default|
+|--------|-----------|--------|-------|
+|`enableTaskAutoScaler`|Enables the auto scaler. If not specified, Druid 
disables the auto scaler even when `autoScalerConfig` is not null.|No|`false`|
+|`taskCountMax`|Maximum number of ingestion tasks. Set `taskCountMax >= 
taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales 
reading tasks up to `{numKafkaPartitions}`. In this case, `taskCountMax` is 
ignored.|Yes||
+|`taskCountMin`|Minimum number of ingestion tasks. When you enable the 
autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts 
with the `taskCountMin` number of tasks to launch.|Yes||
+|`minTriggerScaleActionFrequencyMillis`|Minimum time interval between two 
scale actions.| No|600000|
+|`autoScalerStrategy`|The algorithm of `autoScaler`. Druid only supports the 
`lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more 
information.|No|`lagBased`|
 
-## Submit a supervisor spec
+##### Autoscaler strategy
 
-Druid starts a supervisor for a dataSource when you submit a supervisor spec. 
You can use the data loader in the web console or you can submit a supervisor 
spec to the following endpoint:
+The following table outlines the configuration options for 
`autoScalerStrategy`:
 
-`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor`
+|Property|Description|Required|Default|
+|--------|-----------|--------|-------|
+|`lagCollectionIntervalMillis`|The time period during which Druid collects lag 
metric points.|No|30000|
+|`lagCollectionRangeMillis`|The total time window of lag collection. Use with 
`lagCollectionIntervalMillis` to specify the intervals at which to collect lag 
metric points.|No|600000|
+|`scaleOutThreshold`|The threshold of scale out action. |No|6000000|
+|`triggerScaleOutFractionThreshold`|Enables scale out action if 
`triggerScaleOutFractionThreshold` percent of lag points is higher than 
`scaleOutThreshold`.|No|0.3|
+|`scaleInThreshold`|The threshold of scale in action.|No|1000000|
+|`triggerScaleInFractionThreshold`|Enables scale in action if 
`triggerScaleInFractionThreshold` percent of lag points is lower than 
`scaleOutThreshold`.|No|0.9|
+|`scaleActionStartDelayMillis`|The number of milliseconds to delay after the 
supervisor starts before the first scale logic check.|No|300000|
+|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale 
action is triggered.|No|60000|
+|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1|
+|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2|
 
-For example: 
+#### Idle supervisor configuration
 
-```
-curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json 
http://localhost:8090/druid/indexer/v1/supervisor
-```
+:::info
+Idle state transitioning is currently designated as experimental.
+:::
+
+When the supervisor enters the idle state, no new tasks are launched 
subsequent to the completion of the currently executing tasks. This strategy 
may lead to reduced costs for cluster operators while using topics that get 
sporadic data.
+
+The following table outlines the configuration options for `idleConfig`:
+
+|Property|Description|Required|
+|--------|-----------|--------|
+|`enabled`|If `true`, the supervisor becomes idle if there is no data on input 
stream or topic for some time.|No|`false`|
+|`inactiveAfterMillis`|The supervisor becomes idle if all existing data has 
been read from input topic and no new data has been published for 
`inactiveAfterMillis` milliseconds.|No|`600_000`|
+
+The following example shows a supervisor spec with `lagBased` autoscaler and 
idle configuration enabled:
+
+<details>
+  <summary>Click to view the example</summary>
 
-Where the file `supervisor-spec.json` contains your Kafka supervisor spec file.
+```json
+{
+    "type": "kafka",
+    "spec": {
+      "dataSchema": {
+        ...
+      },
+      "ioConfig": {
+         "topic": "metrics",
+         "inputFormat": {
+             "type": "json"
+         },
+          "consumerProperties": {
+             "bootstrap.servers": "localhost:9092"
+          },
+         "autoScalerConfig": {
+             "enableTaskAutoScaler": true,
+              "taskCountMax": 6,
+              "taskCountMin": 2,
+             "minTriggerScaleActionFrequencyMillis": 600000,
+             "autoScalerStrategy": "lagBased",
+              "lagCollectionIntervalMillis": 30000,
+              "lagCollectionRangeMillis": 600000,
+              "scaleOutThreshold": 6000000,
+              "triggerScaleOutFractionThreshold": 0.3,
+             "scaleInThreshold": 1000000,
+             "triggerScaleInFractionThreshold": 0.9,
+              "scaleActionStartDelayMillis": 300000,
+              "scaleActionPeriodMillis": 60000,
+              "scaleInStep": 1,
+             "scaleOutStep": 2
+         },
+         "taskCount":1,
+         "replicas":1,
+         "taskDuration":"PT1H",
+         "idleConfig": {
+           "enabled": true,
+           "inactiveAfterMillis": 600000 
+         }
+      },
+     "tuningConfig":{
+        ...
+     }
+    }
+}
+```
+</details>
+
+#### Ingest from multiple topics
+
+:::info
+If you enable multi-topic ingestion for a datasource, downgrading to a version 
older than
+28.0.0 will cause the ingestion for that datasource to fail.
+:::
+
+To ingest data from multiple topics, you set `topicPattern` instead of `topic 
in the supervisor `ioConfig` object`.
+You can pass multiple topics as a regex pattern as the value for 
`topicPattern` in `ioConfig`. For example, to
+ingest data from clicks and impressions, set `topicPattern` to 
`clicks|impressions` in `ioCofig`.
+Similarly, you can use `metrics-.*` as the value for `topicPattern` if you 
want to ingest from all the topics that
+start with `metrics-`. If new topics are added to the cluster that match the 
regex, Druid automatically starts
+ingesting from those new topics. A topic name that only matches partially such 
as `my-metrics-12` will not be
+included for ingestion.
+
+When ingesting data from multiple topics, partitions are assigned based on the 
hashcode of the topic name and the

Review Comment:
   ```suggestion
   When ingesting data from multiple topics, Druid assigns partitions based on 
the hashcode of the topic name and the
   ```



##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -130,172 +127,244 @@ The following example demonstrates a supervisor spec 
for Kafka that uses the `JS
 }
 ```
 
-### Kafka input format supervisor spec example
-
-If you want to parse the Kafka metadata fields in addition to the Kafka 
payload value contents, you can use the `kafka` input format.
-
-The `kafka` input format wraps around the payload parsing input format and 
augments the data it outputs with the Kafka event timestamp,
-the Kafka topic name, the Kafka event headers, and the key field that itself 
can be parsed using any available InputFormat.
-
-For example, consider the following structure for a Kafka message that 
represents a fictitious wiki edit in a development environment:
-
-- **Kafka timestamp**: `1680795276351`
-- **Kafka topic**: `wiki-edits`
-- **Kafka headers**:
-  - `env=development`
-  - `zone=z1`
-- **Kafka key**: `wiki-edit`
-- **Kafka payload value**: 
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo 
Toraut","delta":31,"namespace":"Main"}`
-
-Using `{ "type": "json" }` as the input format would only parse the payload 
value.
-To parse the Kafka metadata in addition to the payload, use the `kafka` input 
format.
-
-You would configure it as follows:
-
-- `valueFormat`: Define how to parse the payload value. Set this to the 
payload parsing input format (`{ "type": "json" }`).
-- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the 
Druid schema to avoid conflicts with columns from the payload. The default is 
`kafka.timestamp`.
-- `topicColumnName`: Supply a custom name for the Kafka topic in the Druid 
schema to avoid conflicts with columns from the payload. The default is 
`kafka.topic`. This field is useful when ingesting data from multiple topics 
into same datasource.
-- `headerFormat`: The default value `string` decodes strings in UTF-8 encoding 
from the Kafka header.
-   Other supported encoding formats include the following:
-   - `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1.
-   - `US-ASCII`: Seven-bit ASCII. Also known as ISO646-US. The Basic Latin 
block of the Unicode character set.
-   - `UTF-16`: Sixteen-bit UCS Transformation Format, byte order identified by 
an optional byte-order mark.
-   - `UTF-16BE`: Sixteen-bit UCS Transformation Format, big-endian byte order.
-   - `UTF-16LE`: Sixteen-bit UCS Transformation Format, little-endian byte 
order.
-- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any 
conflicts with columns from the payload. The default is `kafka.header.`.
-  Considering the header from the example, Druid maps the headers to the 
following columns: `kafka.header.env`, `kafka.header.zone`.
-- `keyFormat`: Supply an input format to parse the key. Only the first value 
will be used.
-  If, as in the example, your key values are simple strings, then you can use 
the `tsv` format to parse them.
-  ```
-  {
-    "type": "tsv",
-    "findColumnsFromHeader": false,
-    "columns": ["x"]
-  } 
-  ```
-  Note that for `tsv`,`csv`, and `regex` formats, you need to provide a 
`columns` array to make a valid input format. Only the first one is used, and 
its name will be ignored in favor of `keyColumnName`.
-- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts 
with columns from the payload. The default is `kafka.key`.
+</details>
 
-Putting it together, the following input format (that uses the default values 
for `timestampColumnName`, `topicColumnName`, `headerColumnPrefix`, and 
`keyColumnName`)
+### I/O configuration
 
-```json
-{
-  "type": "kafka",
-  "valueFormat": {
-    "type": "json"
-  },
-  "headerFormat": {
-    "type": "string"
-  },
-  "keyFormat": {
-    "type": "tsv",
-    "findColumnsFromHeader": false,
-    "columns": ["x"]
-  }
-}
-```
+The following table outlines the configuration options for `ioConfig`:
 
-would parse the example message as follows:
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`topic`|String|The Kafka topic to read from. Must be a specific topic. Druid 
does not support topic patterns. To ingest data from multiple topic, see 
[Ingest from multiple topics](#ingest-from-multiple-topics). |Yes||
+|`inputFormat`|Object|The [input 
format](../../ingestion/data-formats.md#input-format) to define input data 
parsing.|Yes||
+|`consumerProperties`|String, Object|A map of properties to pass to the Kafka 
consumer. See [Consumer properties](#consumer-properties) for details.|Yes||
+|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll 
records, in milliseconds.|No|100|
+|`replicas`|Integer|The number of replica sets, where 1 is a single set of 
tasks (no replication). Druid always assigns replicate tasks to different 
workers to provide resiliency against process failure.|No|1|
+|`taskCount`|Integer|The maximum number of reading tasks in a replica set. The 
maximum number of reading tasks equals `taskCount * replicas`. The total number 
of tasks, reading and publishing, is greater than this count. See [Capacity 
planning](../../ingestion/supervisor.md#capacity-planning) for more details. 
When `taskCount > {numKafkaPartitions}`, the actual number of reading tasks is 
less than the `taskCount` value.|No|1|
+|`taskDuration`|ISO 8601 period|The length of time before tasks stop reading 
and begin publishing segments.|No|PT1H|
+|`startDelay`|ISO 8601 period|The period to wait before the supervisor starts 
managing tasks.|No|PT5S|
+|`period`|ISO 8601 period|Determines how often the supervisor executes its 
management logic. Note that the supervisor also runs in response to certain 
events, such as tasks succeeding, failing, and reaching their task duration. 
The `period` value specifies the maximum time between iterations.|No|PT30S|
+|`useEarliestOffset`|Boolean|If a supervisor manages a datasource for the 
first time, it obtains a set of starting offsets from Kafka. This flag 
determines whether it retrieves the earliest or latest offsets in Kafka. Under 
normal circumstances, subsequent tasks start from where the previous segments 
ended. Druid only uses `useEarliestOffset` on the first run.|No|`false`|
+|`completionTimeout`|ISO 8601 period|The length of time to wait before 
declaring a publishing task as failed and terminating it. If the value is too 
low, your tasks may never publish. The publishing clock for a task begins 
roughly after `taskDuration` elapses.|No|PT30M|
+|`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to 
reject messages with timestamps earlier than this date time. For example, if 
this property is set to `2016-01-01T11:00Z` and the supervisor creates a task 
at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than 
`2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments, such as a realtime and a nightly batch ingestion pipeline.|No||
+|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps earlier than this period before the task was created. 
For example, if this property is set to `PT1H` and the supervisor creates a 
task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than 
`2016-01-01T11:00Z`. This may help prevent concurrency issues if your data 
stream has late messages and you have multiple pipelines that need to operate 
on the same segments, such as a realtime and a nightly batch ingestion 
pipeline. Note that you can specify only one of the late message rejection 
properties.|No||
+|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps later than this period after the task reached its task 
duration. For example, if this property is set to `PT1H`, the task duration is 
set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid 
drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes 
run past their task duration, such as in cases of supervisor failover. Setting 
`earlyMessageRejectionPeriod` too low may cause Druid to drop messages 
unexpectedly whenever a task runs past its originally configured task 
duration.|No||
+|`autoScalerConfig`|Object|Defines auto scaling behavior for ingestion tasks. 
See [Task autoscaler](../../ingestion/supervisor.md#task-autoscaler) for more 
information.|No|null|
+|`idleConfig`|Object|Defines how and when the Kafka supervisor can become 
idle. See [Idle supervisor configuration](#idle-supervisor-configuration) for 
more details.|No|null|
 
-```json
-{
-  "channel": "#sv.wikipedia",
-  "timestamp": "2016-06-27T00:00:11.080Z",
-  "page": "Salo Toraut",
-  "delta": 31,
-  "namespace": "Main",
-  "kafka.timestamp": 1680795276351,
-  "kafka.topic": "wiki-edits",
-  "kafka.header.env": "development",
-  "kafka.header.zone": "z1",
-  "kafka.key": "wiki-edit"
-}
-```
+#### Consumer properties
 
-For more information on data formats, see [Data 
formats](../../ingestion/data-formats.md).
+Consumer properties must contain a property `bootstrap.servers` with a list of 
Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`.
+By default, `isolation.level` is set to `read_committed`. If you use older 
versions of Kafka servers without transactions support or don't want Druid to 
consume only committed transactions, set `isolation.level` to 
`read_uncommitted`.
 
-Finally, add these Kafka metadata columns to the `dimensionsSpec` or  set your 
`dimensionsSpec` to auto-detect columns.
-     
-The following supervisor spec demonstrates how to ingest the Kafka header, 
key, timestamp, and topic into Druid dimensions:
+In some cases, you may need to fetch consumer properties at runtime. For 
example, when `bootstrap.servers` is not known upfront, or is not static. To 
enable SSL connections, you must provide passwords for `keystore`, 
`truststore`, and `key` secretly. You can provide configurations at runtime 
with a dynamic config provider implementation like the environment variable 
config provider that comes with Druid. For more information, see [Dynamic 
config provider](../../operations/dynamic-config-provider.md).
+
+For example, if you are using SASL and SSL with Kafka, set the following 
environment variables for the Druid user on the machines running the Overlord 
and the Peon services:
 
 ```
-{
-  "type": "kafka",
-  "spec": {
-    "ioConfig": {
-      "type": "kafka",
-      "consumerProperties": {
-        "bootstrap.servers": "localhost:9092"
-      },
-      "topic": "wiki-edits",
-      "inputFormat": {
-        "type": "kafka",
-        "valueFormat": {
-          "type": "json"
-        },
-        "headerFormat": {
-          "type": "string"
-        },
-        "keyFormat": {
-          "type": "tsv",
-          "findColumnsFromHeader": false,
-          "columns": ["x"]
-        }
-      },
-      "useEarliestOffset": true
-    },
-    "dataSchema": {
-      "dataSource": "wikiticker",
-      "timestampSpec": {
-        "column": "timestamp",
-        "format": "posix"
-      },
-      "dimensionsSpec":  "dimensionsSpec": {
-        "useSchemaDiscovery": true,
-        "includeAllDimensions": true
-      },
-      "granularitySpec": {
-        "queryGranularity": "none",
-        "rollup": false,
-        "segmentGranularity": "day"
+export 
KAFKA_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule 
required username='admin_user' password='admin_password';"
+export SSL_KEY_PASSWORD=mysecretkeypassword
+export SSL_KEYSTORE_PASSWORD=mysecretkeystorepassword
+export SSL_TRUSTSTORE_PASSWORD=mysecrettruststorepassword
+```
+
+```json
+  "druid.dynamic.config.provider": {
+    "type": "environment",
+     "variables": {
+        "sasl.jaas.config": "KAFKA_JAAS_CONFIG",
+        "ssl.key.password": "SSL_KEY_PASSWORD",
+        "ssl.keystore.password": "SSL_KEYSTORE_PASSWORD",
+        "ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD"
       }
-    },
-    "tuningConfig": {
-      "type": "kafka"
-    }
   }
-}
 ```
 
-After Druid ingests the data, you can query the Kafka metadata columns as 
follows:
+Verify that you've changed the values for all configurations to match your own 
environment. In the Druid data loader interface, you can use the environment 
variable config provider syntax in the **Consumer properties** field on the 
**Connect tab**. When connecting to Kafka, Druid replaces the environment 
variables with their corresponding values.
 
-```sql
-SELECT
-  "kafka.header.env",
-  "kafka.key",
-  "kafka.timestamp",
-  "kafka.topic"
-FROM "wikiticker"
-```
+#### Task autoscaler
 
-This query returns:
+You can optionally configure autoscaling behavior for ingestion tasks using 
the `autoScalerConfig` property of the `ioConfig` object.
 
-| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | `kafka.topic` |
-|--------------------|-----------|---------------|---------------|
-| `development`      | `wiki-edit` | `1680795276351` | `wiki-edits`  |
+The following table outlines the configuration options for `autoScalerConfig`:
 
-For more information, see [`kafka` data 
format](../../ingestion/data-formats.md#kafka).
+|Property|Description|Required|Default|
+|--------|-----------|--------|-------|
+|`enableTaskAutoScaler`|Enables the auto scaler. If not specified, Druid 
disables the auto scaler even when `autoScalerConfig` is not null.|No|`false`|
+|`taskCountMax`|Maximum number of ingestion tasks. Set `taskCountMax >= 
taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales 
reading tasks up to `{numKafkaPartitions}`. In this case, `taskCountMax` is 
ignored.|Yes||
+|`taskCountMin`|Minimum number of ingestion tasks. When you enable the 
autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts 
with the `taskCountMin` number of tasks to launch.|Yes||
+|`minTriggerScaleActionFrequencyMillis`|Minimum time interval between two 
scale actions.| No|600000|
+|`autoScalerStrategy`|The algorithm of `autoScaler`. Druid only supports the 
`lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more 
information.|No|`lagBased`|
 
-## Submit a supervisor spec
+##### Autoscaler strategy
 
-Druid starts a supervisor for a dataSource when you submit a supervisor spec. 
You can use the data loader in the web console or you can submit a supervisor 
spec to the following endpoint:
+The following table outlines the configuration options for 
`autoScalerStrategy`:
 
-`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor`
+|Property|Description|Required|Default|
+|--------|-----------|--------|-------|
+|`lagCollectionIntervalMillis`|The time period during which Druid collects lag 
metric points.|No|30000|
+|`lagCollectionRangeMillis`|The total time window of lag collection. Use with 
`lagCollectionIntervalMillis` to specify the intervals at which to collect lag 
metric points.|No|600000|
+|`scaleOutThreshold`|The threshold of scale out action. |No|6000000|
+|`triggerScaleOutFractionThreshold`|Enables scale out action if 
`triggerScaleOutFractionThreshold` percent of lag points is higher than 
`scaleOutThreshold`.|No|0.3|
+|`scaleInThreshold`|The threshold of scale in action.|No|1000000|
+|`triggerScaleInFractionThreshold`|Enables scale in action if 
`triggerScaleInFractionThreshold` percent of lag points is lower than 
`scaleOutThreshold`.|No|0.9|
+|`scaleActionStartDelayMillis`|The number of milliseconds to delay after the 
supervisor starts before the first scale logic check.|No|300000|
+|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale 
action is triggered.|No|60000|
+|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1|
+|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2|
 
-For example: 
+#### Idle supervisor configuration
 
-```
-curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json 
http://localhost:8090/druid/indexer/v1/supervisor
-```
+:::info
+Idle state transitioning is currently designated as experimental.
+:::
+
+When the supervisor enters the idle state, no new tasks are launched 
subsequent to the completion of the currently executing tasks. This strategy 
may lead to reduced costs for cluster operators while using topics that get 
sporadic data.
+
+The following table outlines the configuration options for `idleConfig`:
+
+|Property|Description|Required|
+|--------|-----------|--------|
+|`enabled`|If `true`, the supervisor becomes idle if there is no data on input 
stream or topic for some time.|No|`false`|
+|`inactiveAfterMillis`|The supervisor becomes idle if all existing data has 
been read from input topic and no new data has been published for 
`inactiveAfterMillis` milliseconds.|No|`600_000`|
+
+The following example shows a supervisor spec with `lagBased` autoscaler and 
idle configuration enabled:
+
+<details>
+  <summary>Click to view the example</summary>
 
-Where the file `supervisor-spec.json` contains your Kafka supervisor spec file.
+```json
+{
+    "type": "kafka",
+    "spec": {
+      "dataSchema": {
+        ...
+      },
+      "ioConfig": {
+         "topic": "metrics",
+         "inputFormat": {
+             "type": "json"
+         },
+          "consumerProperties": {
+             "bootstrap.servers": "localhost:9092"
+          },
+         "autoScalerConfig": {
+             "enableTaskAutoScaler": true,
+              "taskCountMax": 6,
+              "taskCountMin": 2,
+             "minTriggerScaleActionFrequencyMillis": 600000,
+             "autoScalerStrategy": "lagBased",
+              "lagCollectionIntervalMillis": 30000,
+              "lagCollectionRangeMillis": 600000,
+              "scaleOutThreshold": 6000000,
+              "triggerScaleOutFractionThreshold": 0.3,
+             "scaleInThreshold": 1000000,
+             "triggerScaleInFractionThreshold": 0.9,
+              "scaleActionStartDelayMillis": 300000,
+              "scaleActionPeriodMillis": 60000,
+              "scaleInStep": 1,
+             "scaleOutStep": 2
+         },
+         "taskCount":1,
+         "replicas":1,
+         "taskDuration":"PT1H",
+         "idleConfig": {
+           "enabled": true,
+           "inactiveAfterMillis": 600000 
+         }
+      },
+     "tuningConfig":{
+        ...
+     }
+    }
+}
+```
+</details>
+
+#### Ingest from multiple topics
+
+:::info
+If you enable multi-topic ingestion for a datasource, downgrading to a version 
older than
+28.0.0 will cause the ingestion for that datasource to fail.
+:::
+
+To ingest data from multiple topics, you set `topicPattern` instead of `topic 
in the supervisor `ioConfig` object`.
+You can pass multiple topics as a regex pattern as the value for 
`topicPattern` in `ioConfig`. For example, to
+ingest data from clicks and impressions, set `topicPattern` to 
`clicks|impressions` in `ioCofig`.
+Similarly, you can use `metrics-.*` as the value for `topicPattern` if you 
want to ingest from all the topics that
+start with `metrics-`. If new topics are added to the cluster that match the 
regex, Druid automatically starts
+ingesting from those new topics. A topic name that only matches partially such 
as `my-metrics-12` will not be
+included for ingestion.
+
+When ingesting data from multiple topics, partitions are assigned based on the 
hashcode of the topic name and the
+ID of the partition within that topic. The partition assignment might not be 
uniform across all the tasks. It's also
+assumed that partitions across individual topics have similar load. It is 
recommended that you have a higher number of
+partitions for a high load topic and a lower number of partitions for a low 
load topic. Assuming that you want to
+ingest from both high and low load topic in the same supervisor.
+
+### Tuning configuration
+
+The `tuningConfig` object is optional. If you don't specify the `tuningConfig` 
object, Druid uses the default configuration settings.
+
+The following table outlines the configuration options for `tuningConfig`:
+
+|Property|Type|Description|Required|Default|
+|--------|----|-----------|--------|-------|
+|`type`|String|The indexing task type. This should always be `kafka`.|Yes||

Review Comment:
   ```suggestion
   |`type`|String|The indexing task type. Must be `kafka`.|Yes||
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to