dannycranmer opened a new pull request #12881: URL: https://github.com/apache/flink/pull/12881
## What is the purpose of the change This is the second milestone of [FLIP-128](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) to add EFO support to the `FlinkKinesisConsumer`. The consumption mechanism in `ShardConsumer` has been abstracted to an interface and implemented to retain existing behaviour. An EFO implementation will be added in a future contribution. This change should not introduce any functional differences. ## Note This PR is blocked by: - Test coverage improvements: - [JIRA issue](https://issues.apache.org/jira/browse/FLINK-18483) - [Pull request](https://github.com/apache/flink/pull/12850) ## Brief change log - `RecordPublisher` - Added a new interface used to consume records from Kinesis and supply them to Flink - `PollingRecordPublisher` - The existing consumption mechanism using `AWS::GetRecords` and `AWS::GetShardIterator` has been refactored into an implementation of `RecordPublisher`. A factory has been added to create instances - Configuration that was deserialised in the constructor has been moved to a class `PollingRecordPublisherConfiguration` - `AdaptivePollingRecordPublisher` - When a user supplies `useAdaptiveReads` a special `RecordPublisher` is created that adds adaptive loop delay and batch size. This functionality has been moved from `ShardConsumer`, but now has it's own implementation that extends `PollingRecordPublisher` - `StartingPosition` - A new object has been created to encapsulate starting sequence number/iterator type. A starting position can be created from a `SentinelSequenceNumber` - `ShardMetricsReporter` - Metric reporting has been split to allow arbitrary metrics to be registered by `RecordPublisher` implementations - `ShardConsumer` reports general metrics using `ShardConsumerMetricsReporter` - `PollingRecordPublisher` reports metrics using `PollingRecordPublisherMetricsReporter` - ` ShardConsumer` - Refactored to accept a `RecordPublisher` and not depend on a `KinesisProxyInterface` - Restarting from aggregated sequence number logic moved to support generic `RecordPublishers`: - Before: At startup a single record was downloaded and already processed subsequences were discarded - After: During consumption any already processed subsequences are discarded - Note: This does add a small overhead to the continuous consumption, but simplifies `RecordPublishers`. An alternative could be to code the `ShardConsumer` to handle restarting from aggregated records via a standard `KinesisProxy` ## Verifying this change This change is already covered by existing tests, such as: - `ShardConsumerTest` A preceding pull request was submitted to increase test coverage before performing the refactor: - https://github.com/apache/flink/pull/12850 This change added tests and can be verified as follows: - `RecordBatch` -> `RecordBatchTest` - `PollingRecordPublisherConfiguration` -> `PollingRecordPublisherConfigurationTest` - `PollingRecordPublisherFactory` -> `PollingRecordPublisherFactoryTest` - `PollingRecordPublisher` -> `PollingRecordPublisherTest` - `PollingRecordPublisherMetricsReporter` -> `PollingRecordPublisherMetricsReporterTest` - `ShardConsumerMetricsReporter` -> `ShardConsumerMetricsReporterTest` - `StartingPosition` -> `StartingPositionTest` I have deployed Flink applications locally and on AWS KDA-Java and verified consumption. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no (dependency change is covered by FLINK-18483) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes (added check for filtering de-aggregated subsequence numbers `ShardConsumer::filterDeaggregatedRecord`) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: yes (restarting consumption from sequence number) - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
