wuchong commented on a change in pull request #9976: [FLINK-14493][core] 
Introduce data types to ConfigOptions.
URL: https://github.com/apache/flink/pull/9976#discussion_r339610697
 
 

 ##########
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
 ##########
 @@ -850,99 +856,173 @@ private void loggingFallback(FallbackKey fallbackKey, 
ConfigOption<?> configOpti
        //  Type conversion
        // 
--------------------------------------------------------------------------------------------
 
-       private int convertToInt(Object o, int defaultValue) {
+       @SuppressWarnings("unchecked")
+       private <T> T convertValue(Object rawValue, Class clazz) {
+               if (Integer.class.equals(clazz)) {
+                       return (T) convertToInt(rawValue);
+               } else if (Long.class.equals(clazz)) {
+                       return (T) convertToLong(rawValue);
+               } else if (Boolean.class.equals(clazz)) {
+                       return (T) convertToBoolean(rawValue);
+               } else if (Float.class.equals(clazz)) {
+                       return (T) convertToFloat(rawValue);
+               } else if (Double.class.equals(clazz)) {
+                       return (T) convertToDouble(rawValue);
+               } else if (String.class.equals(clazz)) {
+                       return (T) convertToString(rawValue);
+               } else if (clazz.isEnum()) {
+                       return (T) convertToEnum(rawValue, clazz);
+               } else if (clazz == Duration.class) {
+                       return (T) convertToDuration(rawValue);
+               } else if (clazz == MemorySize.class) {
+                       return (T) convertToMemorySize(rawValue);
+               } else if (clazz == Map.class) {
+                       return (T) convertToProperties(rawValue);
+               }
+
+               throw new IllegalArgumentException("Unsupported type: " + 
clazz);
+       }
+
+       @SuppressWarnings("unchecked")
+       private <T> T convertToList(Object rawValue, Class atomicClass) {
+               if (rawValue instanceof List) {
+                       return (T) rawValue;
+               } else {
+                       return (T) 
StructuredOptionsSplitter.splitEscaped(rawValue.toString(), ';').stream()
+                               .map(s -> convertValue(s, atomicClass))
+                               .collect(Collectors.toList());
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private Map<String, String> convertToProperties(Object o) {
+               if (o instanceof Map) {
+                       return (Map<String, String>) o;
+               } else {
+                       List<String> listOfRawProperties = 
StructuredOptionsSplitter.splitEscaped(o.toString(), ',');
+                       return listOfRawProperties.stream()
+                               .map(s -> 
StructuredOptionsSplitter.splitEscaped(s, ':'))
+                               .map(pair -> {
+                                       if (pair.size() != 2) {
+                                               throw new 
IllegalArgumentException("Could not parse pair in the map " + pair);
+                                       }
+
+                                       return pair;
+                               })
+                               .collect(Collectors.toMap(
+                                       a -> a.get(0),
+                                       a -> a.get(1)
+                               ));
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private <E extends Enum<E>> E convertToEnum(Object o, Class<E> clazz) {
+               if (o.getClass().equals(clazz)) {
+                       return (E) o;
+               }
+
+               return Enum.valueOf(clazz, 
o.toString().toUpperCase(Locale.ROOT));
+       }
+
+       private Duration convertToDuration(Object o) {
+               if (o.getClass() == Duration.class) {
+                       return (Duration) o;
+               }
+
+               return TimeUtils.parseDuration(o.toString());
+       }
+
+       private MemorySize convertToMemorySize(Object o) {
+               if (o.getClass() == MemorySize.class) {
+                       return (MemorySize) o;
+               }
+
+               return MemorySize.parse(o.toString());
 
 Review comment:
   Thanks for sharing the link. It makes sense to me. 
   Feel free to merge this :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to