[FLINK-7367] [kinesis] Generalize configuration for FlinkKinesisProducer properties
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ed5d9a1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ed5d9a1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ed5d9a1 Branch: refs/heads/master Commit: 9ed5d9a180dcd871e33bf8982434e3afd90ed295 Parents: 98737f9 Author: Bowen Li <bowenl...@gmail.com> Authored: Thu Aug 3 20:59:02 2017 -0700 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Thu Sep 7 12:54:27 2017 +0800 ---------------------------------------------------------------------- docs/dev/connectors/kinesis.md | 27 +++++--- .../kinesis/FlinkKinesisProducer.java | 25 ++----- .../kinesis/config/ProducerConfigConstants.java | 16 ++++- .../kinesis/util/KinesisConfigUtil.java | 48 +++++++++++-- .../kinesis/FlinkKinesisConsumerTest.java | 72 -------------------- .../kinesis/util/KinesisConfigUtilTest.java | 66 ++++++++++++++++++ 6 files changed, 147 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/docs/dev/connectors/kinesis.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index 5fbf24b..1eea308 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -256,23 +256,29 @@ consumer when calling this API can also be modified by using the other keys pref ## Kinesis Producer -The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in -Flink's checkpointing and doesn't provide exactly-once processing guarantees. -Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream. + +Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard. -For the monitoring to work, the user accessing the stream needs access to the Cloud watch service. +For the monitoring to work, the user accessing the stream needs access to the CloudWatch service. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} Properties producerConfig = new Properties(); +// Required configs producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +// Optional configs +producerConfig.put("AggregationMaxCount", "4294967295"); +producerConfig.put("CollectionMaxCount", "1000"); +producerConfig.put("RecordTtl", "30000"); +producerConfig.put("RequestTimeout", "6000"); FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); kinesis.setFailOnError(true); @@ -286,9 +292,15 @@ simpleStringStream.addSink(kinesis); <div data-lang="scala" markdown="1"> {% highlight scala %} val producerConfig = new Properties(); +// Required configs producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +// Optional configs +producerConfig.put("AggregationMaxCount", "4294967295"); +producerConfig.put("CollectionMaxCount", "1000"); +producerConfig.put("RecordTtl", "30000"); +producerConfig.put("RequestTimeout", "6000"); val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig); kinesis.setFailOnError(true); @@ -301,15 +313,14 @@ simpleStringStream.addSink(kinesis); </div> </div> -The above is a simple example of using the producer. Configuration for the producer with the mandatory configuration values is supplied with a `java.util.Properties` -instance as described above for the consumer. The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1". +The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL's configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1". + +If users don't specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL's default value to 100. Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream. Otherwise, the returned stream name is used. -Other optional configuration keys for the producer can be found in `ProducerConfigConstants`. - ## Using Non-AWS Kinesis Endpoints for Testing http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java index e0d3e38..1f5e64c 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java @@ -19,13 +19,11 @@ package org.apache.flink.streaming.connectors.kinesis; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.PropertiesUtil; import com.amazonaws.services.kinesis.producer.Attempt; import com.amazonaws.services.kinesis.producer.KinesisProducer; @@ -91,7 +89,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> { * This is a constructor supporting Flink's {@see SerializationSchema}. * * @param schema Serialization schema for the data type - * @param configProps The properties used to configure AWS credentials and AWS region + * @param configProps The properties used to configure KinesisProducer, including AWS credentials and AWS region */ public FlinkKinesisProducer(final SerializationSchema<OUT> schema, Properties configProps) { @@ -116,13 +114,11 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> { * This is a constructor supporting {@see KinesisSerializationSchema}. * * @param schema Kinesis serialization schema for the data type - * @param configProps The properties used to configure AWS credentials and AWS region + * @param configProps The properties used to configure KinesisProducer, including AWS credentials and AWS region */ public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps) { - this.configProps = checkNotNull(configProps, "configProps can not be null"); - - // check the configuration properties for any conflicting settings - KinesisConfigUtil.validateProducerConfiguration(this.configProps); + checkNotNull(configProps, "configProps can not be null"); + this.configProps = KinesisConfigUtil.replaceDeprecatedProducerKeys(configProps); checkNotNull(schema, "serialization schema cannot be null"); checkArgument( @@ -174,18 +170,9 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> { public void open(Configuration parameters) throws Exception { super.open(parameters); - KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); - - producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION)); + // check and pass the configuration properties + KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps); producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); - if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) { - producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps, - ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG)); - } - if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) { - producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps, - ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG)); - } producer = new KinesisProducer(producerConfig); callback = new FutureCallback<UserRecordResult>() { http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java index d131150..983687e 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java @@ -20,14 +20,24 @@ package org.apache.flink.streaming.connectors.kinesis.config; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; /** + * @deprecated + * * Optional producer specific configuration keys for {@link FlinkKinesisProducer}. */ +@Deprecated public class ProducerConfigConstants extends AWSConfigConstants { - /** Maximum number of items to pack into an PutRecords request. **/ + /** + * @deprecated + * + * Deprecated key. **/ + @Deprecated public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount"; - /** Maximum number of items to pack into an aggregated record. **/ + /** + * @deprecated + * + * Deprecated key. **/ + @Deprecated public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount"; - } http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 42f1af0..997191c 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -38,6 +39,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * Utilities for Flink Kinesis connector configuration. */ public class KinesisConfigUtil { + + /** Maximum number of items to pack into an PutRecords request. **/ + protected static final String COLLECTION_MAX_COUNT = "CollectionMaxCount"; + + /** Maximum number of items to pack into an aggregated record. **/ + protected static final String AGGREGATION_MAX_COUNT = "AggregationMaxCount"; + + /** Limits the maximum allowed put rate for a shard, as a percentage of the backend limits. + * The default value is set as 100% in Flink. KPL's default value is 150% but it makes KPL throw + * RateLimitExceededException too frequently and breaks Flink sink as a result. + **/ + private static final String RATE_LIMIT = "RateLimit"; + + /** Default values for RateLimit. **/ + private static final String DEFAULT_RATE_LIMIT = "100"; + /** * Validate configuration properties for {@link FlinkKinesisConsumer}. */ @@ -127,18 +144,39 @@ public class KinesisConfigUtil { } /** + * Replace deprecated configuration properties for {@link FlinkKinesisProducer}. + * This should be remove along with deprecated keys + */ + public static Properties replaceDeprecatedProducerKeys(Properties configProps) { + // Replace deprecated key + if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) { + configProps.setProperty(COLLECTION_MAX_COUNT, + configProps.getProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT)); + configProps.remove(ProducerConfigConstants.COLLECTION_MAX_COUNT); + } + // Replace deprecated key + if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) { + configProps.setProperty(AGGREGATION_MAX_COUNT, + configProps.getProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT)); + configProps.remove(ProducerConfigConstants.AGGREGATION_MAX_COUNT); + } + return configProps; + } + + /** * Validate configuration properties for {@link FlinkKinesisProducer}. */ - public static void validateProducerConfiguration(Properties config) { + public static KinesisProducerConfiguration validateProducerConfiguration(Properties config) { checkNotNull(config, "config can not be null"); validateAwsConfiguration(config); - validateOptionalPositiveLongProperty(config, ProducerConfigConstants.COLLECTION_MAX_COUNT, - "Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid non-negative long value."); + // Override KPL default value if it's not specified by user + if (!config.containsKey(RATE_LIMIT)) { + config.setProperty(RATE_LIMIT, DEFAULT_RATE_LIMIT); + } - validateOptionalPositiveLongProperty(config, ProducerConfigConstants.AGGREGATION_MAX_COUNT, - "Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid non-negative long value."); + return KinesisProducerConfiguration.fromProperties(config); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 6af4c62..4a007d5 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; -import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; @@ -46,7 +45,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.util.InstantiationUtil; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; @@ -507,76 +505,6 @@ public class FlinkKinesisConsumerTest { } // ---------------------------------------------------------------------- - // FlinkKinesisConsumer.validateProducerConfiguration() tests - // ---------------------------------------------------------------------- - - @Test - public void testUnparsableLongForCollectionMaxCountInConfig() { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Invalid value given for maximum number of items to pack into a PutRecords request"); - - Properties testConfig = new Properties(); - testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "unparsableLong"); - - KinesisConfigUtil.validateProducerConfiguration(testConfig); - } - - @Test - public void testUnparsableLongForAggregationMaxCountInConfig() { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Invalid value given for maximum number of items to pack into an aggregated record"); - - Properties testConfig = new Properties(); - testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "unparsableLong"); - - KinesisConfigUtil.validateProducerConfiguration(testConfig); - } - - // ---------------------------------------------------------------------- - // Tests to verify serializability - // ---------------------------------------------------------------------- - - @Test - public void testCreateWithNonSerializableDeserializerFails() { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("The provided deserialization schema is not serializable"); - - Properties testConfig = new Properties(); - testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - new FlinkKinesisConsumer<>("test-stream", new NonSerializableDeserializationSchema(), testConfig); - } - - @Test - public void testCreateWithSerializableDeserializer() { - Properties testConfig = new Properties(); - testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - new FlinkKinesisConsumer<>("test-stream", new SerializableDeserializationSchema(), testConfig); - } - - @Test - public void testConsumerIsSerializable() { - Properties testConfig = new Properties(); - testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); - testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); - testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); - - FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("test-stream", new SimpleStringSchema(), testConfig); - assertTrue(InstantiationUtil.isSerializable(consumer)); - } - - // ---------------------------------------------------------------------- // Tests related to state initialization // ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java new file mode 100644 index 0000000..d14ac04 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for KinesisConfigUtil. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class}) +public class KinesisConfigUtilTest { + @Rule + private ExpectedException exception = ExpectedException.none(); + + @Test + public void testUnparsableLongForProducerConfiguration() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'"); + + Properties testConfig = new Properties(); + testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty("RateLimit", "unparsableLong"); + + KinesisConfigUtil.validateProducerConfiguration(testConfig); + } + + @Test + public void testReplaceDeprecatedKeys() { + Properties testConfig = new Properties(); + testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1"); + testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2"); + Properties replacedConfig = KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig); + + assertEquals("1", replacedConfig.getProperty(KinesisConfigUtil.AGGREGATION_MAX_COUNT)); + assertEquals("2", replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT)); + } +}