[FLINK-7367] [kinesis] Generalize configuration for FlinkKinesisProducer 
properties


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ed5d9a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ed5d9a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ed5d9a1

Branch: refs/heads/master
Commit: 9ed5d9a180dcd871e33bf8982434e3afd90ed295
Parents: 98737f9
Author: Bowen Li <bowenl...@gmail.com>
Authored: Thu Aug 3 20:59:02 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Thu Sep 7 12:54:27 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  | 27 +++++---
 .../kinesis/FlinkKinesisProducer.java           | 25 ++-----
 .../kinesis/config/ProducerConfigConstants.java | 16 ++++-
 .../kinesis/util/KinesisConfigUtil.java         | 48 +++++++++++--
 .../kinesis/FlinkKinesisConsumerTest.java       | 72 --------------------
 .../kinesis/util/KinesisConfigUtilTest.java     | 66 ++++++++++++++++++
 6 files changed, 147 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 5fbf24b..1eea308 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -256,23 +256,29 @@ consumer when calling this API can also be modified by 
using the other keys pref
 
 ## Kinesis Producer
 
-The `FlinkKinesisProducer` is used for putting data from a Flink stream into a 
Kinesis stream. Note that the producer is not participating in
-Flink's checkpointing and doesn't provide exactly-once processing guarantees.
-Also, the Kinesis producer does not guarantee that records are written in 
order to the shards (See 
[here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and 
[here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax)
 for more details).
+The `FlinkKinesisProducer` uses [Kinesis Producer Library 
(KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)
 to put data from a Flink stream into a Kinesis stream.
+
+Note that the producer is not participating in Flink's checkpointing and 
doesn't provide exactly-once processing guarantees. Also, the Kinesis producer 
does not guarantee that records are written in order to the shards (See 
[here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and 
[here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax)
 for more details).
 
 In case of a failure or a resharding, data will be written again to Kinesis, 
leading to duplicates. This behavior is usually called "at-least-once" 
semantics.
 
 To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" 
in the AWS dashboard.
 
-For the monitoring to work, the user accessing the stream needs access to the 
Cloud watch service.
+For the monitoring to work, the user accessing the stream needs access to the 
CloudWatch service.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Properties producerConfig = new Properties();
+// Required configs
 producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
 producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
 producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+// Optional configs
+producerConfig.put("AggregationMaxCount", "4294967295");
+producerConfig.put("CollectionMaxCount", "1000");
+producerConfig.put("RecordTtl", "30000");
+producerConfig.put("RequestTimeout", "6000");
 
 FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new 
SimpleStringSchema(), producerConfig);
 kinesis.setFailOnError(true);
@@ -286,9 +292,15 @@ simpleStringStream.addSink(kinesis);
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val producerConfig = new Properties();
+// Required configs
 producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
 producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
"aws_access_key_id");
 producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+// Optional configs
+producerConfig.put("AggregationMaxCount", "4294967295");
+producerConfig.put("CollectionMaxCount", "1000");
+producerConfig.put("RecordTtl", "30000");
+producerConfig.put("RequestTimeout", "6000");
 
 val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, 
producerConfig);
 kinesis.setFailOnError(true);
@@ -301,15 +313,14 @@ simpleStringStream.addSink(kinesis);
 </div>
 </div>
 
-The above is a simple example of using the producer. Configuration for the 
producer with the mandatory configuration values is supplied with a 
`java.util.Properties`
-instance as described above for the consumer. The example demonstrates 
producing a single Kinesis stream in the AWS region "us-east-1".
+The above is a simple example of using the producer. To initialize 
`FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, 
`AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` 
instance. Users can also pass in KPL's configurations as optional parameters to 
customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL 
configs and explanations can be found 
[here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties).
 The example demonstrates producing a single Kinesis stream in the AWS region 
"us-east-1".
+
+If users don't specify any KPL configs and values, `FlinkKinesisProducer` will 
use default config values of KPL, except `RateLimit`. `RateLimit` limits the 
maximum allowed put rate for a shard, as a percentage of the backend limits. 
KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` 
too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` 
overrides KPL's default value to 100.
 
 Instead of a `SerializationSchema`, it also supports a 
`KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send 
the data to multiple streams. This is
 done using the `KinesisSerializationSchema.getTargetStream(T element)` method. 
Returning `null` there will instruct the producer to write the element to the 
default stream.
 Otherwise, the returned stream name is used.
 
-Other optional configuration keys for the producer can be found in 
`ProducerConfigConstants`.
-
 
 ## Using Non-AWS Kinesis Endpoints for Testing
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index e0d3e38..1f5e64c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -19,13 +19,11 @@ package org.apache.flink.streaming.connectors.kinesis;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.PropertiesUtil;
 
 import com.amazonaws.services.kinesis.producer.Attempt;
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
@@ -91,7 +89,7 @@ public class FlinkKinesisProducer<OUT> extends 
RichSinkFunction<OUT> {
         * This is a constructor supporting Flink's {@see SerializationSchema}.
         *
         * @param schema Serialization schema for the data type
-        * @param configProps The properties used to configure AWS credentials 
and AWS region
+        * @param configProps The properties used to configure KinesisProducer, 
including AWS credentials and AWS region
         */
        public FlinkKinesisProducer(final SerializationSchema<OUT> schema, 
Properties configProps) {
 
@@ -116,13 +114,11 @@ public class FlinkKinesisProducer<OUT> extends 
RichSinkFunction<OUT> {
         * This is a constructor supporting {@see KinesisSerializationSchema}.
         *
         * @param schema Kinesis serialization schema for the data type
-        * @param configProps The properties used to configure AWS credentials 
and AWS region
+        * @param configProps The properties used to configure KinesisProducer, 
including AWS credentials and AWS region
         */
        public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, 
Properties configProps) {
-               this.configProps = checkNotNull(configProps, "configProps can 
not be null");
-
-               // check the configuration properties for any conflicting 
settings
-               
KinesisConfigUtil.validateProducerConfiguration(this.configProps);
+               checkNotNull(configProps, "configProps can not be null");
+               this.configProps = 
KinesisConfigUtil.replaceDeprecatedProducerKeys(configProps);
 
                checkNotNull(schema, "serialization schema cannot be null");
                checkArgument(
@@ -174,18 +170,9 @@ public class FlinkKinesisProducer<OUT> extends 
RichSinkFunction<OUT> {
        public void open(Configuration parameters) throws Exception {
                super.open(parameters);
 
-               KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
-
-               
producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
+               // check and pass the configuration properties
+               KinesisProducerConfiguration producerConfig = 
KinesisConfigUtil.validateProducerConfiguration(configProps);
                
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
-               if 
(configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
-                       
producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
-                                       
ProducerConfigConstants.COLLECTION_MAX_COUNT, 
producerConfig.getCollectionMaxCount(), LOG));
-               }
-               if 
(configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
-                       
producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
-                                       
ProducerConfigConstants.AGGREGATION_MAX_COUNT, 
producerConfig.getAggregationMaxCount(), LOG));
-               }
 
                producer = new KinesisProducer(producerConfig);
                callback = new FutureCallback<UserRecordResult>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
index d131150..983687e 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
@@ -20,14 +20,24 @@ package 
org.apache.flink.streaming.connectors.kinesis.config;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 
 /**
+ * @deprecated
+ *
  * Optional producer specific configuration keys for {@link 
FlinkKinesisProducer}.
  */
+@Deprecated
 public class ProducerConfigConstants extends AWSConfigConstants {
 
-       /** Maximum number of items to pack into an PutRecords request. **/
+       /**
+        * @deprecated
+        *
+        * Deprecated key. **/
+       @Deprecated
        public static final String COLLECTION_MAX_COUNT = 
"aws.producer.collectionMaxCount";
 
-       /** Maximum number of items to pack into an aggregated record. **/
+       /**
+        * @deprecated
+        *
+        * Deprecated key. **/
+       @Deprecated
        public static final String AGGREGATION_MAX_COUNT = 
"aws.producer.aggregationMaxCount";
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 42f1af0..997191c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 
 import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -38,6 +39,22 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Utilities for Flink Kinesis connector configuration.
  */
 public class KinesisConfigUtil {
+
+       /** Maximum number of items to pack into an PutRecords request. **/
+       protected static final String COLLECTION_MAX_COUNT = 
"CollectionMaxCount";
+
+       /** Maximum number of items to pack into an aggregated record. **/
+       protected static final String AGGREGATION_MAX_COUNT = 
"AggregationMaxCount";
+
+       /** Limits the maximum allowed put rate for a shard, as a percentage of 
the backend limits.
+        * The default value is set as 100% in Flink. KPL's default value is 
150% but it makes KPL throw
+        * RateLimitExceededException too frequently and breaks Flink sink as a 
result.
+        **/
+       private static final String RATE_LIMIT = "RateLimit";
+
+       /** Default values for RateLimit. **/
+       private static final String DEFAULT_RATE_LIMIT = "100";
+
        /**
         * Validate configuration properties for {@link FlinkKinesisConsumer}.
         */
@@ -127,18 +144,39 @@ public class KinesisConfigUtil {
        }
 
        /**
+        * Replace deprecated configuration properties for {@link 
FlinkKinesisProducer}.
+        * This should be remove along with deprecated keys
+        */
+       public static Properties replaceDeprecatedProducerKeys(Properties 
configProps) {
+               // Replace deprecated key
+               if 
(configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
+                       configProps.setProperty(COLLECTION_MAX_COUNT,
+                                       
configProps.getProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT));
+                       
configProps.remove(ProducerConfigConstants.COLLECTION_MAX_COUNT);
+               }
+               // Replace deprecated key
+               if 
(configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
+                       configProps.setProperty(AGGREGATION_MAX_COUNT,
+                                       
configProps.getProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT));
+                       
configProps.remove(ProducerConfigConstants.AGGREGATION_MAX_COUNT);
+               }
+               return configProps;
+       }
+
+       /**
         * Validate configuration properties for {@link FlinkKinesisProducer}.
         */
-       public static void validateProducerConfiguration(Properties config) {
+       public static KinesisProducerConfiguration 
validateProducerConfiguration(Properties config) {
                checkNotNull(config, "config can not be null");
 
                validateAwsConfiguration(config);
 
-               validateOptionalPositiveLongProperty(config, 
ProducerConfigConstants.COLLECTION_MAX_COUNT,
-                       "Invalid value given for maximum number of items to 
pack into a PutRecords request. Must be a valid non-negative long value.");
+               // Override KPL default value if it's not specified by user
+               if (!config.containsKey(RATE_LIMIT)) {
+                       config.setProperty(RATE_LIMIT, DEFAULT_RATE_LIMIT);
+               }
 
-               validateOptionalPositiveLongProperty(config, 
ProducerConfigConstants.AGGREGATION_MAX_COUNT,
-                       "Invalid value given for maximum number of items to 
pack into an aggregated record. Must be a valid non-negative long value.");
+               return KinesisProducerConfiguration.fromProperties(config);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 6af4c62..4a007d5 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -33,7 +33,6 @@ import 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
@@ -46,7 +45,6 @@ import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.InstantiationUtil;
 
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -507,76 +505,6 @@ public class FlinkKinesisConsumerTest {
        }
 
        // 
----------------------------------------------------------------------
-       // FlinkKinesisConsumer.validateProducerConfiguration() tests
-       // 
----------------------------------------------------------------------
-
-       @Test
-       public void testUnparsableLongForCollectionMaxCountInConfig() {
-               exception.expect(IllegalArgumentException.class);
-               exception.expectMessage("Invalid value given for maximum number 
of items to pack into a PutRecords request");
-
-               Properties testConfig = new Properties();
-               testConfig.setProperty(ProducerConfigConstants.AWS_REGION, 
"us-east-1");
-               
testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
-               
testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
-               
testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, 
"unparsableLong");
-
-               KinesisConfigUtil.validateProducerConfiguration(testConfig);
-       }
-
-       @Test
-       public void testUnparsableLongForAggregationMaxCountInConfig() {
-               exception.expect(IllegalArgumentException.class);
-               exception.expectMessage("Invalid value given for maximum number 
of items to pack into an aggregated record");
-
-               Properties testConfig = new Properties();
-               testConfig.setProperty(ProducerConfigConstants.AWS_REGION, 
"us-east-1");
-               
testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
-               
testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
-               
testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, 
"unparsableLong");
-
-               KinesisConfigUtil.validateProducerConfiguration(testConfig);
-       }
-
-       // 
----------------------------------------------------------------------
-       // Tests to verify serializability
-       // 
----------------------------------------------------------------------
-
-       @Test
-       public void testCreateWithNonSerializableDeserializerFails() {
-               exception.expect(IllegalArgumentException.class);
-               exception.expectMessage("The provided deserialization schema is 
not serializable");
-
-               Properties testConfig = new Properties();
-               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
-               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
-               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
-
-               new FlinkKinesisConsumer<>("test-stream", new 
NonSerializableDeserializationSchema(), testConfig);
-       }
-
-       @Test
-       public void testCreateWithSerializableDeserializer() {
-               Properties testConfig = new Properties();
-               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
-               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
-               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
-
-               new FlinkKinesisConsumer<>("test-stream", new 
SerializableDeserializationSchema(), testConfig);
-       }
-
-       @Test
-       public void testConsumerIsSerializable() {
-               Properties testConfig = new Properties();
-               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
-               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
-               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
-
-               FlinkKinesisConsumer<String> consumer = new 
FlinkKinesisConsumer<>("test-stream", new SimpleStringSchema(), testConfig);
-               assertTrue(InstantiationUtil.isSerializable(consumer));
-       }
-
-       // 
----------------------------------------------------------------------
        // Tests related to state initialization
        // 
----------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed5d9a1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
new file mode 100644
index 0000000..d14ac04
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for KinesisConfigUtil.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
+public class KinesisConfigUtilTest {
+       @Rule
+       private ExpectedException exception = ExpectedException.none();
+
+       @Test
+       public void testUnparsableLongForProducerConfiguration() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Error trying to set field RateLimit 
with the value 'unparsableLong'");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ProducerConfigConstants.AWS_REGION, 
"us-east-1");
+               testConfig.setProperty("RateLimit", "unparsableLong");
+
+               KinesisConfigUtil.validateProducerConfiguration(testConfig);
+       }
+
+       @Test
+       public void testReplaceDeprecatedKeys() {
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ProducerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1");
+               
testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2");
+               Properties replacedConfig = 
KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig);
+
+               assertEquals("1", 
replacedConfig.getProperty(KinesisConfigUtil.AGGREGATION_MAX_COUNT));
+               assertEquals("2", 
replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT));
+       }
+}

Reply via email to