[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16116108#comment-16116108 ]
ASF GitHub Bot commented on FLINK-7367: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4473#discussion_r131577895 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -169,14 +169,27 @@ public void open(Configuration parameters) throws Exception { producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION)); producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); - if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) { - producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps, - ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG)); - } - if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) { - producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps, - ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG)); - } + + producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps, + ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG)); + + producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps, + ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG)); + + producerConfig.setMaxConnections(PropertiesUtil.getLong(configProps, + ProducerConfigConstants.MAX_CONNECTIONS, producerConfig.getMaxConnections(), LOG)); + + producerConfig.setRateLimit(PropertiesUtil.getLong(configProps, + ProducerConfigConstants.RATE_LIMIT, producerConfig.getRateLimit(), LOG)); --- End diff -- Starting from this line, the indentation is not consistent. > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --------------------------------------------------------------------------------------------------------------- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector > Affects Versions: 1.3.0 > Reporter: Bowen Li > Assignee: Bowen Li > Fix For: 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > We need to parameterize FlinkKinesisProducer to pass in the above params, in > order to cater to our need -- This message was sent by Atlassian JIRA (v6.4.14#64029)