xiaolong-sn commented on a change in pull request #12920:
URL: https://github.com/apache/flink/pull/12920#discussion_r457984340



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
##########
@@ -169,6 +245,72 @@ public static void 
validateConsumerConfiguration(Properties config) {
                }
        }
 
+       /**
+        * Validate the record publisher type.
+        * @param config config properties
+        * @return if `ConsumerConfigConstants.RECORD_PUBLISHER_TYPE` is set, 
return the parsed record publisher type. Else return polling record publisher 
type.
+        */
+       public static RecordPublisherType 
validateRecordPublisherType(Properties config) {
+               if 
(config.containsKey(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE)) {
+                       String recordPublisherType = 
config.getProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE);
+
+                       // specified record publisher type in stream must be 
either EFO or POLLING
+                       try {
+                               return 
RecordPublisherType.valueOf(recordPublisherType);
+                       } catch (IllegalArgumentException e) {
+                               String errorMessage = 
Arrays.stream(RecordPublisherType.values())
+                                       
.map(Enum::name).collect(Collectors.joining(", "));
+                               throw new IllegalArgumentException("Invalid 
record publisher type in stream set in config. Valid values are: " + 
errorMessage);
+                       }
+               } else {
+                       return RecordPublisherType.POLLING;
+               }
+       }
+
+       /**
+        * Validate if the given config is a valid EFO configuration.
+        * @param config  config properties.
+        * @param streams the streams which is sent to match the EFO consumer 
arn if the EFO registration mode is set to `NONE`.
+        */
+       public static void validateEfoConfiguration(Properties config, 
List<String> streams) {
+               String efoRegistrationType;

Review comment:
       I've got your point. That is brilliant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to