[
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121143#comment-16121143
]
ASF GitHub Bot commented on FLINK-7367:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4473#discussion_r132373446
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
---
@@ -165,17 +167,10 @@ public void
setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- KinesisProducerConfiguration producerConfig = new
KinesisProducerConfiguration();
-
-
producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
- if
(configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
-
producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
-
ProducerConfigConstants.COLLECTION_MAX_COUNT,
producerConfig.getCollectionMaxCount(), LOG));
- }
- if
(configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
-
producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
-
ProducerConfigConstants.AGGREGATION_MAX_COUNT,
producerConfig.getAggregationMaxCount(), LOG));
+ // Override KPL default value if it's not specified by user
+ if
(!configProps.containsKey(ProducerConfigConstants.RATE_LIMIT)) {
+
producerConfig.setRateLimit(ProducerConfigConstants.DEFAULT_RATE_LIMIT);
--- End diff --
Ah I see why now, just read your Javadocs :)
Should we move this override to
`KinesisConfigUtil.validateProducerConfiguration`, where the producer config is
built?
> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime,
> MaxConnections, RequestTimeout, etc)
> ---------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
> Issue Type: Improvement
> Components: Kinesis Connector
> Affects Versions: 1.3.0
> Reporter: Bowen Li
> Assignee: Bowen Li
> Fix For: 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html]
> and [their sample on
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
> developers can set more to make the max use of KinesisProducer, and make it
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> We need to parameterize FlinkKinesisProducer to pass in the above params, in
> order to cater to our need
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)