jsun98 commented on a change in pull request #6431: Add Kinesis Indexing 
Service to core Druid
URL: https://github.com/apache/incubator-druid/pull/6431#discussion_r241874329
 
 

 ##########
 File path: docs/content/development/extensions-core/kinesis-ingestion.md
 ##########
 @@ -0,0 +1,384 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+---
+layout: doc_page
+---
+
+# Kinesis Indexing Service
+
+Similar to the [Kafka indexing service](./kafka-ingestion.html), The Kinesis 
indexing service enables the configuration of *supervisors* on the Overlord, 
which facilitate ingestion from
+Kinesis by managing the creation and lifetime of Kinesis indexing tasks. These 
indexing tasks read events using Kinesis's own
+Shards and Sequence Number mechanism and are therefore able to provide 
guarantees of exactly-once ingestion. They are also
+able to read non-recent events from Kinesis and are not subject to the window 
period considerations imposed on other
+ingestion mechanisms. The supervisor oversees the state of the indexing tasks 
to coordinate handoffs, manage failures,
+and ensure that the scalability and replication requirements are maintained.
+
+The Kinesis indexing service is provided as the 
`druid-kinesis-indexing-service` core extension (see
+[Including Extensions](../../operations/including-extensions.html)). Please 
note that this is
+currently designated as an *experimental feature* and is subject to the usual
+[experimental caveats](../experimental.html).
+
+## Submitting a Supervisor Spec
+
+The Kinesis indexing service requires that the 
`druid-kinesis-indexing-service` extension be loaded on both the overlord
+and the middle managers. A supervisor for a dataSource is started by 
submitting a supervisor spec via HTTP POST to
+`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor`, for 
example:
+
+```
+curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json 
http://localhost:8090/druid/indexer/v1/supervisor
+```
+
+A sample supervisor spec is shown below:
+
+```json
+{
+  "type": "kinesis",
+  "dataSchema": {
+    "dataSource": "metrics-kinesis",
+    "parser": {
+      "type": "string",
+      "parseSpec": {
+        "format": "json",
+        "timestampSpec": {
+          "column": "timestamp",
+          "format": "auto"
+        },
+        "dimensionsSpec": {
+          "dimensions": [],
+          "dimensionExclusions": [
+            "timestamp",
+            "value"
+          ]
+        }
+      }
+    },
+    "metricsSpec": [
+      {
+        "name": "count",
+        "type": "count"
+      },
+      {
+        "name": "value_sum",
+        "fieldName": "value",
+        "type": "doubleSum"
+      },
+      {
+        "name": "value_min",
+        "fieldName": "value",
+        "type": "doubleMin"
+      },
+      {
+        "name": "value_max",
+        "fieldName": "value",
+        "type": "doubleMax"
+      }
+    ],
+    "granularitySpec": {
+      "type": "uniform",
+      "segmentGranularity": "HOUR",
+      "queryGranularity": "NONE"
+    }
+  },
+  "tuningConfig": {
+    "type": "kinesis",
+    "maxRowsPerSegment": 5000000
+  },
+  "ioConfig": {
+    "stream": "metrics",
+    "endpoint": "kinesis.us-east-1.amazonaws.com",
+    "taskCount": 1,
+    "replicas": 1,
+    "taskDuration": "PT1H",
+    "recordsPerFetch": 2000,
+    "fetchDelayMillis": 1000
+  }
+}
+```
+
+## Supervisor Configuration
+
+|Field|Description|Required|
+|--------|-----------|---------|
+|`type`|The supervisor type, this should always be `kinesis`.|yes|
+|`dataSchema`|The schema that will be used by the Kinesis indexing task during 
ingestion, see [Ingestion Spec 
DataSchema](../../ingestion/ingestion-spec.html#dataschema).|yes|
+|`tuningConfig`|A KinesisSupervisorTuningConfig to configure the supervisor 
and indexing tasks, see below.|no|
+|`ioConfig`|A KinesisSupervisorIOConfig to configure the supervisor and 
indexing tasks, see below.|yes|
+
+### KinesisSupervisorTuningConfig
+
+The tuningConfig is optional and default parameters will be used if no 
tuningConfig is specified.
+
+|Field|Type|Description|Required|
+|-----|----|-----------|--------|
+|`type`|String|The indexing task type, this should always be `kinesis`.|yes|
+|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. 
This number is the post-aggregation rows, so it is not equivalent to the number 
of input events, but the number of aggregated rows that those events result in. 
This is used to manage the required JVM heap size. Maximum heap memory usage 
for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no 
(default == 100000)|
+|`maxBytesInMemory`|Long|The number of bytes to aggregate in heap memory 
before persisting. This is based on a rough estimate of memory usage and not 
actual usage. Normally this is computed internally and user does not need to 
set it. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + 
maxPendingPersists).  |no (default == One-sixth of max JVM memory)|
+|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; 
this number is post-aggregation rows. Handoff will happen either if 
`maxRowsPerSegment` or `maxTotalRows` is hit or every 
`intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)|
+|`maxTotalRows`|Long|The number of rows to aggregate across all segments; this 
number is post-aggregation rows. Handoff will happen either if 
`maxRowsPerSegment` or `maxTotalRows` is hit or every 
`intermediateHandoffPeriod`, whichever happens earlier.|no (default == 
unlimited)|
+|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the 
rate at which intermediate persists occur.|no (default == PT10M)|
+|`maxPendingPersists`|Integer|Maximum number of persists that can be pending 
but not started. If this limit would be exceeded by a new intermediate persist, 
ingestion will block until the currently-running persist finishes. Maximum heap 
memory usage for indexing scales with maxRowsInMemory * (2 + 
maxPendingPersists).|no (default == 0, meaning one persist can be running 
concurrently with ingestion, and none can be queued up)|
+|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more 
details.|no|
+|`reportParseExceptions`|Boolean|If true, exceptions encountered during 
parsing will be thrown and will halt ingestion; if false, unparseable rows and 
fields will be skipped.|no (default == false)|
+|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It 
must be >= 0, where 0 means to wait forever.|no (default == 0)|
+|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer sequence 
numbers if the next sequence number that it is trying to fetch is less than the 
earliest available sequence number for that particular shard. The sequence 
number will be reset to either the earliest or latest sequence number depending 
on `useEarliestOffset` property of `KinesisSupervisorIOConfig` (see below). 
This situation typically occurs when messages in Kinesis are no longer 
available for consumption and therefore won't be ingested into Druid. If set to 
false then ingestion for that particular shard will halt and manual 
intervention is required to correct the situation, please see `Reset 
Supervisor` API below.|no (default == false)|
+|`skipSequenceNumberAvailabilityCheck`|Boolean|Whether to enable checking if 
the current sequence number is still available in a particular Kinesis shard. 
If set to false, the indexing task will attempt to reset the current sequence 
number (or not), depending on the value of `resetOffsetAutomatically`. |no 
(default == false)|
+|`workerThreads`|Integer|The number of threads that will be used by the 
supervisor for asynchronous operations.|no (default == min(10, taskCount))|
+|`chatThreads`|Integer|The number of threads that will be used for 
communicating with indexing tasks.|no (default == min(10, taskCount * 
replicas))|
+|`chatRetries`|Integer|The number of times HTTP requests to indexing tasks 
will be retried before considering tasks unresponsive.|no (default == 8)|
+|`httpTimeout`|ISO8601 Period|How long to wait for a HTTP response from an 
indexing task.|no (default == PT10S)|
+|`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to 
attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)|
+|`recordBufferSize`|Integer|Size of the buffer (number of events) used between 
the Kinesis fetch threads and the main ingestion thread.|no (default == 10000)|
+|`recordBufferOfferTimeout`|Integer|Length of time in milliseconds to wait for 
space to become available in the buffer before timing out.|no (default == 5000)|
+|`recordBufferFullWait`|Integer|Length of time in milliseconds to wait for the 
buffer to drain before attempting to fetch records from Kinesis again.|no 
(default == 5000)|
+|`fetchSequenceNumberTimeout`|Integer|Length of time in milliseconds to wait 
for Kinesis to return the earliest or latest sequence number for a shard. 
Kinesis will not return the latest sequence number if no data is actively being 
written to that shard. In this case, this fetch call will repeatedly timeout 
and retry until fresh data is written to the stream.|no (default == 60000)|
+|`fetchThreads`|Integer|Size of the pool of threads fetching data from 
Kinesis. There is no benefit in having more threads than Kinesis shards.|no 
(default == max(1, {numProcessors} - 1))|
+|`segmentWriteOutMediumFactory`|Object|Segment write-out medium to use when 
creating segments. See below for more information.|no (not specified by 
default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` 
is used)|
+|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand 
off segments. Handoff will happen either if `maxRowsPerSegment` or 
`maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens 
earlier.|no (default == P2147483647D)|
+|`logParseExceptions`|Boolean|If true, log an error message when a parsing 
exception occurs, containing information about the row where the error 
occurred.|no, default == false|
+|`maxParseExceptions`|Integer|The maximum number of parse exceptions that can 
occur before the task halts ingestion and fails. Overridden if 
`reportParseExceptions` is set.|no, unlimited default|
+|`maxSavedParseExceptions`|Integer|When a parse exception occurs, Druid can 
keep track of the most recent parse exceptions. "maxSavedParseExceptions" 
limits how many exception instances will be saved. These saved exceptions will 
be made available after the task finishes in the [task completion 
report](../../ingestion/reports.html). Overridden if `reportParseExceptions` is 
set.|no, default == 0|
+|`maxRecordsPerPoll`|Integer| The maximum number of records/events to be 
fetched from buffer per poll. The actual maximum will be 
`Max(maxRecordsPerPoll, Max(bufferSize, 1)) |no, default == 100|
+
+#### IndexSpec
+
+|Field|Type|Description|Required|
+|-----|----|-----------|--------|
+|bitmap|Object|Compression format for bitmap indexes. Should be a JSON object; 
see below for options.|no (defaults to Concise)|
+|dimensionCompression|String|Compression format for dimension columns. Choose 
from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)|
+|metricCompression|String|Compression format for metric columns. Choose from 
`LZ4`, `LZF`, `uncompressed`, or `none`.|no (default == `LZ4`)|
+|longEncoding|String|Encoding format for metric and dimension columns with 
type long. Choose from `auto` or `longs`. `auto` encodes the values using 
sequence number or lookup table depending on column cardinality, and store them 
with variable size. `longs` stores the value as is with 8 bytes each.|no 
(default == `longs`)|
+
+##### Bitmap types
+
+For Concise bitmaps:
+
+|Field|Type|Description|Required|
+|-----|----|-----------|--------|
+|`type`|String|Must be `concise`.|yes|
+
+For Roaring bitmaps:
+
+|Field|Type|Description|Required|
+|-----|----|-----------|--------|
+|`type`|String|Must be `roaring`.|yes|
+|`compressRunOnSerialization`|Boolean|Use a run-length encoding where it is 
estimated as more space efficient.|no (default == `true`)|
+
+#### SegmentWriteOutMediumFactory
+
+|Field|Type|Description|Required|
+|-----|----|-----------|--------|
+|`type`|String|See [Additional Peon Configuration: 
SegmentWriteOutMediumFactory](../../configuration/index.html#segmentwriteoutmediumfactory)
 for explanation and available options.|yes|
+
+### KinesisSupervisorIOConfig
+
+|Field|Type|Description|Required|
+|-----|----|-----------|--------|
+|`stream`|String|The Kinesis stream to read.|yes|
+|`endpoint`|String|The AWS Kinesis stream endpoint for a region. You can find 
a list of endpoints 
[here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).|no 
(default == kinesis.us-east-1.amazonaws.com)|
+|`replicas`|Integer|The number of replica sets, where 1 means a single set of 
tasks (no replication). Replica tasks will always be assigned to different 
workers to provide resiliency against node failure.|no (default == 1)|
+|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. 
This means that the maximum number of reading tasks will be `taskCount * 
replicas` and the total number of tasks (*reading* + *publishing*) will be 
higher than this. See 'Capacity Planning' below for more details. The number of 
reading tasks will be less than `taskCount` if `taskCount > 
{numKinesisshards}`.|no (default == 1)|
+|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading 
and begin publishing their segment. Note that segments are only pushed to deep 
storage and loadable by historical nodes when the indexing task completes.|no 
(default == PT1H)|
+|`startDelay`|ISO8601 Period|The period to wait before the supervisor starts 
managing tasks.|no (default == PT5S)|
+|`period`|ISO8601 Period|How often the supervisor will execute its management 
logic. Note that the supervisor will also run in response to certain events 
(such as tasks succeeding, failing, and reaching their taskDuration) so this 
value specifies the maximum time between iterations.|no (default == PT30S)|
+|`useEarliestSequenceNumber`|Boolean|If a supervisor is managing a dataSource 
for the first time, it will obtain a set of starting sequence numbers from 
Kinesis. This flag determines whether it retrieves the earliest or latest 
sequence numbers in Kinesis. Under normal circumstances, subsequent tasks will 
start from where the previous segments ended so this flag will only be used on 
first run.|no (default == false)|
+|`completionTimeout`|ISO8601 Period|The length of time to wait before 
declaring a publishing task as failed and terminating it. If this is set too 
low, your tasks may never publish. The publishing clock for a task begins 
roughly after `taskDuration` elapses.|no (default == PT6H)|
+|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
+|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`recordsPerFetch`|Integer|The number of records to request per GetRecords 
call to Kinesis. See 'Determining Fetch Settings' below.|no (default == 2000)|
+|`fetchDelayMillis`|Integer|Time in milliseconds to wait between subsequent 
GetRecords calls to Kinesis. See 'Determining Fetch Settings' below.|no 
(default == 1000)|
 
 Review comment:
   talked to Dave about this. `recordsPerFetch`, `fetchDelayMillis` and 
`deaggregate` are related to configuring the input streams. Similar 
to`ConsumerProperties` and `isSkipOffsetGaps` in Kafka. So I think leaving them 
in ioConfig is fine

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to