jsun98 opened a new pull request #6431: Add Kinesis Indexing Service to core Druid URL: https://github.com/apache/incubator-druid/pull/6431 ## Design and major changes Added `kinesis-indexing-service` to core druid extensions. Due to its similarities to `kafka-indexing-service`, I've created a corresponding number of abstract classes in the package `org.apache.druid.indexing.seekablestream` to serve as the parent classes for both Kafka and Kinesis. Common logic between the Kinesis and Kafka indexing services are grouped into these abstract classes as much as possible, the child classes define operations specific to Kafka or Kinesis The logic in `KinesisIndexTask` and `KafkaIndexTask` has not been merged yet, due to Kinesis not yet supporting incremental handoff, but this will be implemented in a follow-up PR. The code in the two index tasks are left as is for now. The unit tests for Kinesis depends on [LocalStack](https://github.com/localstack/localstack), a local AWS testing stack, which requires a **running Docker** instance The following classes can be found in the `org.apache.druid.indexing.seekablestream` package ### `SeekableStreamSupervisor` - all logic are essentially the same as in `KafkaSupervisor` including lifecycle, `start()` and `stop()` - moved `runInternal()` and the methods it calls here - replaced all `Kafka` or `Kinesis` specific objects (e.g. `KafkaDataSourceMetadata`) with the corresponding `SeekableStream` version (e.g. `SeekableStreamDataSourceMetadata`) - the `protected abstract` methods defined at the end of the class are used to perform Kafka or Kinesis specific tasks, such as `checkSourceMetaDataMatch()` checks if the passed in metadata is an instance of Kafka/Kinesis datasource metadata - `KafkaConsumer` instances have been replaced by `RecordSupplier`, see below - incremental handoff / checkpointing is not yet implemented for Kinesis, but will be in a follow-up PR - Kinesis does not have the concept of `lag`, so something similar will be implemented in a follow-up PR ### `RecordSupplier` - this is a wrapper interface for consuming records from Kafka and Kinesis streams. - Implemented by `KafkaRecordSupplier` and `KinesisRecordSupplier`. - Currently used in `SeekableStreamSupervisor` and will be used in `SeekableStreamIndexTask` in the follow-up PR. - It serves to provide a uniform interface between Kafka and Kinesis streams for providing records. It uses `StreamPartition` and `Record` classes to abstract the differences between Kafka and Kinesis partition / offset / sequence. - methods usually take `StreamPartition` as input, and returns `Record` - `KafkaRecordSupplier` wraps the `KafkaConsumer` object - `KinesisRecordSupplier` uses a `AmazonKinesisClient` to get records from Kinesis stream. ### `SequenceNumber` - this is the wrapper class for Kafka/Kinesis sequence number / offset. - implemented by `KafkaSequenceNumber` and `KinesisSequenceNumber` - its main purpose is to provide the ability to compare sequences / offsets and make sure that they come in the correct order ### `SeekableStreamPartitions` - the `[Kafka/Kinesis]Paritions` classes were removed in favor of using this class - due to the use of generics, deserializing using Jackson requires constructing a specific TypeReference. Details see code ### `SeekableStreamIndexTaskRunner` - this interface is basically `KafkaIndexTaskRunner` renamed - Kinesis index task currently does not separate the runner logic as Kafka does, so this interface is only implemented by the Kafka task runners - will be implemented by Kinesis tasks in a followup PR ### `TaskReportData` - moved this class from Kafka indexing service into SeekableStream ## Files not changed significantly These are files that have been refactored to extend the SeekableStream super class, but their code has not changed much since the Kinesis and Kafka versions are similar enough - `[Kafka/Kinesis]SupervisorIOConfig` - `[Kafka/Kinesis]SupervisorReportPayload` - `[Kafka/Kinesis]SupervisorSpec` - `[Kafka/Kinesis]SupervisorTuningConfig` - `[Kafka/Kinesis]TaskReportData` - `[Kafka/Kinesis]DataSourceMetadata` - `[Kafka/Kinesis]IndexTask` - `[Kafka/Kinesis]IndexTaskClient` - `[Kafka/Kinesis]IndexTaskClientFactory` - `[Kafka/Kinesis]IOConfig` - `[Kafka/Kinesis]TuningConfig`
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
