jsun98 opened a new pull request #6431: Add Kinesis Indexing Service to core 
Druid
URL: https://github.com/apache/incubator-druid/pull/6431
 
 
   ## Design and major changes
   
   Added `kinesis-indexing-service` to core druid extensions. Due to its 
similarities to `kafka-indexing-service`, I've created a corresponding number 
of abstract classes in the package `org.apache.druid.indexing.seekablestream` 
to serve as the parent classes for both Kafka and Kinesis. 
   
   Common logic between the Kinesis and Kafka indexing services are grouped 
into these abstract classes as much as possible, the child classes define 
operations specific to Kafka or Kinesis
   
   The logic in `KinesisIndexTask` and `KafkaIndexTask` has not been merged 
yet, due to Kinesis not yet supporting incremental handoff, but this will be 
implemented in a follow-up PR. The code in the two index tasks are left as is 
for now.
   
   The unit tests for Kinesis depends on 
[LocalStack](https://github.com/localstack/localstack), a local AWS testing 
stack, which requires a **running Docker** instance
   
   The following classes can be found in the 
`org.apache.druid.indexing.seekablestream` package
   ### `SeekableStreamSupervisor`
   - all logic are essentially the same as in `KafkaSupervisor` including 
lifecycle, `start()` and `stop()`
   - moved `runInternal()` and the methods it calls here
   - replaced all `Kafka` or `Kinesis` specific objects (e.g. 
`KafkaDataSourceMetadata`) with the corresponding `SeekableStream` version 
(e.g. `SeekableStreamDataSourceMetadata`)
   - the `protected abstract` methods defined at the end of the class are used 
to perform Kafka or Kinesis specific tasks, such as 
`checkSourceMetaDataMatch()` checks if the passed in metadata is an instance of 
Kafka/Kinesis datasource metadata
   - `KafkaConsumer` instances have been replaced by `RecordSupplier`, see below
   - incremental handoff / checkpointing is not yet implemented for Kinesis, 
but will be in a follow-up PR
   - Kinesis does not have the concept of `lag`, so something similar will be 
implemented in a follow-up PR
   
   ### `RecordSupplier`
   - this is a wrapper interface for consuming records from Kafka and Kinesis 
streams. 
   - Implemented by `KafkaRecordSupplier` and `KinesisRecordSupplier`. 
   - Currently used in `SeekableStreamSupervisor` and will be used in 
`SeekableStreamIndexTask` in the follow-up PR. 
   - It serves to provide a uniform interface between Kafka and Kinesis streams 
for providing records. It uses `StreamPartition` and `Record` classes to 
abstract the differences between Kafka and Kinesis partition / offset / 
sequence.
   - methods usually take `StreamPartition` as input, and returns `Record`
   - `KafkaRecordSupplier` wraps the `KafkaConsumer` object
   - `KinesisRecordSupplier` uses a `AmazonKinesisClient` to get records from 
Kinesis stream.
   
   ### `SequenceNumber`
   - this is the wrapper class for Kafka/Kinesis sequence number / offset.
   - implemented by `KafkaSequenceNumber` and `KinesisSequenceNumber`
   - its main purpose is to provide the ability to compare sequences / offsets 
and make sure that they come in the correct order
   
   ### `SeekableStreamPartitions`
   - the `[Kafka/Kinesis]Paritions` classes were removed in favor of using this 
class
   - due to the use of generics, deserializing using Jackson requires 
constructing a specific TypeReference. Details see code
   
   ### `SeekableStreamIndexTaskRunner`
   - this interface is basically `KafkaIndexTaskRunner` renamed
   - Kinesis index task currently does not separate the runner logic as Kafka 
does, so this interface is only implemented by the Kafka task runners
   - will be implemented by Kinesis tasks in a followup PR
   
   ### `TaskReportData`
   - moved this class from Kafka indexing service into SeekableStream
   
   ## Files not changed significantly
   These are files that have been refactored to extend the SeekableStream super 
class, but their code has not changed much since the Kinesis and Kafka versions 
are similar enough
   - `[Kafka/Kinesis]SupervisorIOConfig`
   - `[Kafka/Kinesis]SupervisorReportPayload`
   - `[Kafka/Kinesis]SupervisorSpec`
   - `[Kafka/Kinesis]SupervisorTuningConfig`
   - `[Kafka/Kinesis]TaskReportData`
   - `[Kafka/Kinesis]DataSourceMetadata`
   - `[Kafka/Kinesis]IndexTask`
   - `[Kafka/Kinesis]IndexTaskClient`
   - `[Kafka/Kinesis]IndexTaskClientFactory`
   - `[Kafka/Kinesis]IOConfig`
   - `[Kafka/Kinesis]TuningConfig`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to