Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4656#discussion_r139931718
--- Diff:
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
---
@@ -50,7 +51,66 @@ public void testUnparsableLongForProducerConfiguration()
{
testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
testConfig.setProperty("RateLimit", "unparsableLong");
- KinesisConfigUtil.validateProducerConfiguration(testConfig);
+ KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+ }
+
+ @Test
+ public void testDefaultRateLimitInProducerConfiguration() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
+
+ KinesisProducerConfiguration kpc =
KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+
+ assertEquals(100, kpc.getRateLimit());
+ }
+
+ @Test
+ public void testCustomizedRateLimitInProducerConfiguration() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
+ testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150");
+
+ KinesisProducerConfiguration kpc =
KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+
+ assertEquals(150, kpc.getRateLimit());
+ }
+
+ @Test
+ public void testDefaultThreadingModelInProducerConfiguration() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
+ KinesisProducerConfiguration kpc =
KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
+
+
assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED,
kpc.getThreadingModel());
+ }
+
+ @Test
+ public void testCustomizedThreadingModelSizeInProducerConfiguration() {
--- End diff --
nit: I think the "Size" in the name test is redundant here. I'll remove it.
---