[
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15243991#comment-15243991
]
Tzu-Li (Gordon) Tai commented on FLINK-3229:
--------------------------------------------
(Duplicate comment from FLINK-3211. Posting it here also to keep the issue
updated.)
https://github.com/tzulitai/flink/tree/FLINK-3229/flink-streaming-connectors/flink-connector-kinesis
Here is the initial working version of FlinkKinesisConsumer that I have been
testing in off-production environments, updated corresponding to the recent
Flink 1.0 changes.
I'm still refactoring the code just a bit for easier unit tests. The PR will be
very soon.
> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> ------------------------------------------------------------------------------
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming Connectors
> Affects Versions: 1.0.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties config = new Properties();
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_RETRIES, "3");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_BACKOFF_MILLIS,
> "1000");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_START_POSITION_TYPE, "latest");
> config.put(FlinkKinesisConsumer.CONFIG_AWS_REGION, "us-east-1");
> AWSCredentialsProvider credentials = // credentials API in AWS SDK
> DataStream<T> kinesisRecords = env
> .addSource(new FlinkKinesisConsumer<>(
> listOfStreams, credentials, new SimpleStringSchema(), config
> ));
> {code}
> Currently still considering which read start positions to support
> ("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER"). The discussions for this
> can be found in https://issues.apache.org/jira/browser/FLINK-3211.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)