dannycranmer opened a new pull request #12881:
URL: https://github.com/apache/flink/pull/12881


   ## What is the purpose of the change
   
   This is the second milestone of 
[FLIP-128](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)
 to add EFO support to the `FlinkKinesisConsumer`. The consumption mechanism in 
`ShardConsumer` has been abstracted to an interface and implemented to retain 
existing behaviour. An EFO implementation will be added in a future 
contribution. This change should not introduce any functional differences.
   
   ## Note
   This PR is blocked by:
   - Test coverage improvements:
     - [JIRA issue](https://issues.apache.org/jira/browse/FLINK-18483)
     - [Pull request](https://github.com/apache/flink/pull/12850)
   
   ## Brief change log
   
   - `RecordPublisher`
     - Added a new interface used to consume records from Kinesis and supply 
them to Flink
   - `PollingRecordPublisher` 
     - The existing consumption mechanism using `AWS::GetRecords` and 
`AWS::GetShardIterator` has been refactored into an implementation of 
`RecordPublisher`. A factory has been added to create instances
     - Configuration that was deserialised in the constructor has been moved to 
a class `PollingRecordPublisherConfiguration`
   - `AdaptivePollingRecordPublisher`
     - When a user supplies `useAdaptiveReads` a special `RecordPublisher` is 
created that adds adaptive loop delay and batch size. This functionality has 
been moved from `ShardConsumer`, but now has it's own implementation that 
extends `PollingRecordPublisher`
   -  `StartingPosition`
      - A new object has been created to encapsulate starting sequence 
number/iterator type. A starting position can be created from a 
`SentinelSequenceNumber` 
   - `ShardMetricsReporter`
     - Metric reporting has been split to allow arbitrary metrics to be 
registered by `RecordPublisher` implementations
     - `ShardConsumer` reports general metrics using 
`ShardConsumerMetricsReporter`
     - `PollingRecordPublisher` reports metrics using 
`PollingRecordPublisherMetricsReporter`
   - ` ShardConsumer`
     - Refactored to accept a `RecordPublisher` and not depend on a 
`KinesisProxyInterface`
     - Restarting from aggregated sequence number logic moved to support 
generic `RecordPublishers`:
       - Before: At startup a single record was downloaded and already 
processed subsequences were discarded
       - After: During consumption any already processed subsequences are 
discarded
       - Note: This does add a small overhead to the continuous consumption, 
but simplifies `RecordPublishers`. An alternative could be to code the 
`ShardConsumer` to handle restarting from aggregated records via a standard 
`KinesisProxy`
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as:
   - `ShardConsumerTest`
   
   A preceding pull request was submitted to increase test coverage before 
performing the refactor:
   - https://github.com/apache/flink/pull/12850
   
   This change added tests and can be verified as follows:
   - `RecordBatch` -> `RecordBatchTest`
   - `PollingRecordPublisherConfiguration` -> 
`PollingRecordPublisherConfigurationTest`
   - `PollingRecordPublisherFactory` -> `PollingRecordPublisherFactoryTest`
   - `PollingRecordPublisher` -> `PollingRecordPublisherTest`
   - `PollingRecordPublisherMetricsReporter` -> 
`PollingRecordPublisherMetricsReporterTest`
   - `ShardConsumerMetricsReporter` -> `ShardConsumerMetricsReporterTest`
   - `StartingPosition` -> `StartingPositionTest`
   
   I have deployed Flink applications locally and on AWS KDA-Java and verified 
consumption.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no (dependency 
change is covered by FLINK-18483)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): yes (added 
check for filtering de-aggregated subsequence numbers 
`ShardConsumer::filterDeaggregatedRecord`)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: yes (restarting 
consumption from sequence number)
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to