tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis] 
Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r243472774
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ##########
 @@ -193,18 +193,32 @@ public static Properties 
replaceDeprecatedProducerKeys(Properties configProps) {
                return configProps;
        }
 
-       public static Properties replaceDeprecatedConsumerKeys(Properties 
configProps) {
-               HashMap<String, String> deprecatedOldKeyToNewKeys = new 
HashMap<>();
-               
deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
 ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE);
-               
deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
 ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX);
-               
deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
 ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT);
-               for (Map.Entry<String, String> entry : 
deprecatedOldKeyToNewKeys.entrySet()) {
-                       String deprecatedOldKey = entry.getKey();
+       /**
+        * <p>
+        *  A set of configuration paremeters associated with the 
describeStreams API may be used if:
+        *      1) an legacy client wants to consume from Kinesis
+        *      2) a current client wants to consumer from DynamoDB streams
+        *
+        * In the context of 1), the set of configurations needs to be 
translated to the corresponding
+        * configurations in the Kinesis listShards API. In the mean time, keep 
these configs since
+        * they may be useful in the context of 2), i.e., polling data from a 
DynamoDB stream.
+        * </p>
+        *
+        * @param configProps original config properties.
+        * @return backfilled config properties.
+        */
+       public static Properties backfillConsumerKeys(Properties configProps) {
+               HashMap<String, String> oldKeyToNewKeys = new HashMap<>();
+               
oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, 
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE);
+               
oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, 
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX);
+               
oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
 ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT);
+               for (Map.Entry<String, String> entry : 
oldKeyToNewKeys.entrySet()) {
+                       String oldKey = entry.getKey();
                        String newKey = entry.getValue();
-                       if (configProps.containsKey(deprecatedOldKey)) {
-                               LOG.warn("Please note {} property has been 
deprecated. Please use the {} new property key", deprecatedOldKey, newKey);
-                               configProps.setProperty(newKey, 
configProps.getProperty(deprecatedOldKey));
-                               configProps.remove(deprecatedOldKey);
+                       if (configProps.containsKey(oldKey)) {
+                               LOG.warn("Backfill the property key {} based on 
the original key {}", newKey, oldKey);
 
 Review comment:
   Remove this warning since it isn't applicable any more (the key is expected 
to be used for DynamoDB).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to