[ 
https://issues.apache.org/jira/browse/FLINK-10422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627458#comment-16627458
 ] 

ASF GitHub Bot commented on FLINK-10422:
----------------------------------------

EugeneYushin opened a new pull request #6760: [FLINK-10422] Follow AWS specs in 
Kinesis Consumer
URL: https://github.com/apache/flink/pull/6760
 
 
   ## What is the purpose of the change
   
   Inline Flink Kinesis connector with AWS specs related to shard id 
conventions.
   Related Jira story: https://issues.apache.org/jira/browse/FLINK-10422
   Mailing list conversation: 
https://lists.apache.org/thread.html/96de3bac9761564767cf283b58d664f5ae1b076e0c4431620552af5b@%3Cdev.flink.apache.org%3E
   
   ## Brief change log
     - Remove custom ShardId comparator logic as it's redundant, rely on AWS 
client libs to get shard list with exclusive start shard id
     - Remove test related cleaning in production code
     - Add tests to check correct shards a returned on second run of 
`listShards` method for 2 cases: new shards are present, no new shards
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
     - Run 
`testGetShardListWithNewShardsOnSecondRun/testGetShardWithNoNewShards` unit 
tests in `KinesisProxyTest.class`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Follow AWS specs in Kinesis Consumer 
> -------------------------------------
>
>                 Key: FLINK-10422
>                 URL: https://issues.apache.org/jira/browse/FLINK-10422
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.6.1
>            Reporter: eugen yushin
>            Priority: Major
>              Labels: pull-request-available
>
> *Related conversation in mailing list:*
> [https://lists.apache.org/thread.html/96de3bac9761564767cf283b58d664f5ae1b076e0c4431620552af5b@%3Cdev.flink.apache.org%3E]
> *Summary:*
> Flink Kinesis consumer checks shards id for a particular pattern:
> {noformat}
> "^shardId-\\d{12}"
> {noformat}
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java#L132]
> While this inlines with current Kinesis streams server implementation (all 
> streams follows this pattern), it confronts with AWS docs:
>  
> {code:java}
> ShardId
>  The unique identifier of the shard within the stream.
>  Type: String
>  Length Constraints: Minimum length of 1. Maximum length of 128.
> Pattern: [a-zA-Z0-9_.-]+
>  Required: Yes
> {code}
>  
> [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html]
> *Intention:*
>  We have no guarantees and can't rely on patterns other than provided in AWS 
> manifest.
>  Any custom implementation of Kinesis mock should rely on AWS manifest which 
> claims ShardID to be alfanums. This prevents anyone to use Flink with such 
> kind of mocks.
> The reason behind the scene to use particular pattern "^shardId-d12" is to 
> create Flink's custom Shard comparator, filter already seen shards, and pass 
> latest shard for client.listShards only to limit the scope for RPC call to 
> AWS.
> In the meantime, I think we can get rid of this logic at all. The current 
> usage in project is:
>  - fix Kinesalite bug (I've already opened an issue to cover this:
>  [https://github.com/mhart/kinesalite/issues/76] and opened PR: 
> [https://github.com/mhart/kinesalite/pull/77]). We can move this logic to 
> test code base to keep production code clean for now
>  
> [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L464]
>  - adjust last seen shard id. We can simply omit this cause' AWS client won't 
> return already seen shards and we will have new ids only or nothing.
> [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L475]
>  
> [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L406]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to