tweise commented on a change in pull request #6968: [FLINK-4582] [kinesis]
Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r243472774
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -193,18 +193,32 @@ public static Properties
replaceDeprecatedProducerKeys(Properties configProps) {
return configProps;
}
- public static Properties replaceDeprecatedConsumerKeys(Properties
configProps) {
- HashMap<String, String> deprecatedOldKeyToNewKeys = new
HashMap<>();
-
deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE);
-
deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX);
-
deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT);
- for (Map.Entry<String, String> entry :
deprecatedOldKeyToNewKeys.entrySet()) {
- String deprecatedOldKey = entry.getKey();
+ /**
+ * <p>
+ * A set of configuration paremeters associated with the
describeStreams API may be used if:
+ * 1) an legacy client wants to consume from Kinesis
+ * 2) a current client wants to consumer from DynamoDB streams
+ *
+ * In the context of 1), the set of configurations needs to be
translated to the corresponding
+ * configurations in the Kinesis listShards API. In the mean time, keep
these configs since
+ * they may be useful in the context of 2), i.e., polling data from a
DynamoDB stream.
+ * </p>
+ *
+ * @param configProps original config properties.
+ * @return backfilled config properties.
+ */
+ public static Properties backfillConsumerKeys(Properties configProps) {
+ HashMap<String, String> oldKeyToNewKeys = new HashMap<>();
+
oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE);
+
oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX);
+
oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT);
+ for (Map.Entry<String, String> entry :
oldKeyToNewKeys.entrySet()) {
+ String oldKey = entry.getKey();
String newKey = entry.getValue();
- if (configProps.containsKey(deprecatedOldKey)) {
- LOG.warn("Please note {} property has been
deprecated. Please use the {} new property key", deprecatedOldKey, newKey);
- configProps.setProperty(newKey,
configProps.getProperty(deprecatedOldKey));
- configProps.remove(deprecatedOldKey);
+ if (configProps.containsKey(oldKey)) {
+ LOG.warn("Backfill the property key {} based on
the original key {}", newKey, oldKey);
Review comment:
Remove this warning since it isn't applicable any more (the key is expected
to be used for DynamoDB).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services