Repository: flink
Updated Branches:
  refs/heads/release-1.5 de4f28308 -> 41e2b3e17


[FLINK-9188] [kinesis] Generic mechanism to set ClientConfiguration properties.

This closes #5889.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67ff336e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67ff336e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67ff336e

Branch: refs/heads/release-1.5
Commit: 67ff336e7a16c555f68b8a6b08a24ee502212372
Parents: de4f283
Author: Thomas Weise <[email protected]>
Authored: Mon Apr 16 22:01:52 2018 -0700
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Wed May 2 20:09:07 2018 +0800

----------------------------------------------------------------------
 .../connectors/kinesis/proxy/KinesisProxy.java  |  5 +-
 .../connectors/kinesis/util/AWSUtil.java        | 47 ++++++++++++
 .../BeanDeserializerModifierForIgnorables.java  | 79 ++++++++++++++++++++
 .../kinesis/proxy/KinesisProxyTest.java         | 15 ++++
 4 files changed, 145 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/67ff336e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 3486206..09e9d4c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -186,7 +186,10 @@ public class KinesisProxy implements KinesisProxyInterface 
{
         * @return
         */
        protected AmazonKinesis createKinesisClient(Properties configProps) {
-               return AWSUtil.createKinesisClient(configProps, new 
ClientConfigurationFactory().getConfig());
+
+               ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
+               AWSUtil.setAwsClientConfigProperties(awsClientConfig, 
configProps);
+               return AWSUtil.createKinesisClient(configProps, 
awsClientConfig);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/67ff336e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 2e9090e..cfddfa2 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -35,7 +35,16 @@ import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.BeanDeserializerFactory;
+import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
+import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
+import com.fasterxml.jackson.databind.deser.DeserializerFactory;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -159,4 +168,42 @@ public class AWSUtil {
                }
                return true;
        }
+
+       /**
+        * The prefix used for properties that should be applied to {@link 
ClientConfiguration}.
+        */
+       public static final String AWS_CLIENT_CONFIG_PREFIX = 
"aws.clientconfig.";
+
+       /**
+        * Set all prefixed properties on {@link ClientConfiguration}.
+        * @param config
+        * @param configProps
+        */
+       public static void setAwsClientConfigProperties(ClientConfiguration 
config,
+                                                                               
                        Properties configProps) {
+
+               Map<String, Object> awsConfigProperties = new HashMap<>();
+               for (Map.Entry<Object, Object> entry : configProps.entrySet()) {
+                       String key = (String) entry.getKey();
+                       if (key.startsWith(AWS_CLIENT_CONFIG_PREFIX)) {
+                               
awsConfigProperties.put(key.substring(AWS_CLIENT_CONFIG_PREFIX.length()), 
entry.getValue());
+                       }
+               }
+               // Jackson does not like the following properties
+               String[] ignorableProperties = {"secureRandom"};
+               BeanDeserializerModifier modifier = new 
BeanDeserializerModifierForIgnorables(
+                       ClientConfiguration.class, ignorableProperties);
+               DeserializerFactory factory = 
BeanDeserializerFactory.instance.withDeserializerModifier(
+                       modifier);
+               ObjectMapper mapper = new ObjectMapper(null, null,
+                       new DefaultDeserializationContext.Impl(factory));
+
+               JsonNode propTree = mapper.convertValue(awsConfigProperties, 
JsonNode.class);
+               try {
+                       mapper.readerForUpdating(config).readValue(propTree);
+               } catch (IOException ex) {
+                       throw new RuntimeException(ex);
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67ff336e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/BeanDeserializerModifierForIgnorables.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/BeanDeserializerModifierForIgnorables.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/BeanDeserializerModifierForIgnorables.java
new file mode 100644
index 0000000..51b80f3
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/BeanDeserializerModifierForIgnorables.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import com.fasterxml.jackson.databind.BeanDescription;
+import com.fasterxml.jackson.databind.DeserializationConfig;
+import com.fasterxml.jackson.databind.deser.BeanDeserializerBuilder;
+import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
+import com.fasterxml.jackson.databind.introspect.BeanPropertyDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Jackson bean deserializer utility that allows skipping of properties, for 
example because they
+ * cannot be handled by the default serializer or should be ignored for other 
reason.
+ *
+ * <p>Original source:
+ * 
https://stackoverflow.com/questions/12305438/jackson-dynamic-filtering-of-properties-during-deserialization
+ */
+public class BeanDeserializerModifierForIgnorables extends 
BeanDeserializerModifier {
+
+       private Class<?> type;
+       private List<String> ignorables;
+
+       public BeanDeserializerModifierForIgnorables(Class clazz, String... 
properties) {
+               ignorables = new ArrayList<>();
+               for (String property : properties) {
+                       ignorables.add(property);
+               }
+               this.type = clazz;
+       }
+
+       @Override
+       public BeanDeserializerBuilder updateBuilder(
+               DeserializationConfig config, BeanDescription beanDesc,
+               BeanDeserializerBuilder builder) {
+               if (!type.equals(beanDesc.getBeanClass())) {
+                       return builder;
+               }
+
+               for (String ignorable : ignorables) {
+                       builder.addIgnorable(ignorable);
+               }
+               return builder;
+       }
+
+       @Override
+       public List<BeanPropertyDefinition> updateProperties(
+               DeserializationConfig config, BeanDescription beanDesc,
+               List<BeanPropertyDefinition> propDefs) {
+               if (!type.equals(beanDesc.getBeanClass())) {
+                       return propDefs;
+               }
+
+               List<BeanPropertyDefinition> newPropDefs = new ArrayList<>();
+               for (BeanPropertyDefinition propDef : propDefs) {
+                       if (!ignorables.contains(propDef.getName())) {
+                               newPropDefs.add(propDef);
+                       }
+               }
+               return newPropDefs;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/67ff336e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index c84d89b..25f4381 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -86,4 +86,19 @@ public class KinesisProxyTest {
                assertEquals(10000, clientConfiguration.getSocketTimeout());
        }
 
+       @Test
+       public void testClientConfigOverride() {
+
+               Properties configProps = new Properties();
+               configProps.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
+               configProps.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX + 
"socketTimeout", "9999");
+
+               KinesisProxyInterface proxy = KinesisProxy.create(configProps);
+
+               AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
+               ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
+                       "clientConfiguration");
+               assertEquals(9999, clientConfiguration.getSocketTimeout());
+       }
+
 }

Reply via email to