dawidwys commented on a change in pull request #9976: [FLINK-14493][core] 
Introduce data types to ConfigOptions.
URL: https://github.com/apache/flink/pull/9976#discussion_r339487103
 
 

 ##########
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
 ##########
 @@ -850,99 +856,173 @@ private void loggingFallback(FallbackKey fallbackKey, 
ConfigOption<?> configOpti
        //  Type conversion
        // 
--------------------------------------------------------------------------------------------
 
-       private int convertToInt(Object o, int defaultValue) {
+       @SuppressWarnings("unchecked")
+       private <T> T convertValue(Object rawValue, Class clazz) {
+               if (Integer.class.equals(clazz)) {
+                       return (T) convertToInt(rawValue);
+               } else if (Long.class.equals(clazz)) {
+                       return (T) convertToLong(rawValue);
+               } else if (Boolean.class.equals(clazz)) {
+                       return (T) convertToBoolean(rawValue);
+               } else if (Float.class.equals(clazz)) {
+                       return (T) convertToFloat(rawValue);
+               } else if (Double.class.equals(clazz)) {
+                       return (T) convertToDouble(rawValue);
+               } else if (String.class.equals(clazz)) {
+                       return (T) convertToString(rawValue);
+               } else if (clazz.isEnum()) {
+                       return (T) convertToEnum(rawValue, clazz);
+               } else if (clazz == Duration.class) {
+                       return (T) convertToDuration(rawValue);
+               } else if (clazz == MemorySize.class) {
+                       return (T) convertToMemorySize(rawValue);
+               } else if (clazz == Map.class) {
+                       return (T) convertToProperties(rawValue);
+               }
+
+               throw new IllegalArgumentException("Unsupported type: " + 
clazz);
+       }
+
+       @SuppressWarnings("unchecked")
+       private <T> T convertToList(Object rawValue, Class atomicClass) {
+               if (rawValue instanceof List) {
+                       return (T) rawValue;
+               } else {
+                       return (T) 
StructuredOptionsSplitter.splitEscaped(rawValue.toString(), ';').stream()
+                               .map(s -> convertValue(s, atomicClass))
+                               .collect(Collectors.toList());
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private Map<String, String> convertToProperties(Object o) {
+               if (o instanceof Map) {
+                       return (Map<String, String>) o;
+               } else {
+                       List<String> listOfRawProperties = 
StructuredOptionsSplitter.splitEscaped(o.toString(), ',');
+                       return listOfRawProperties.stream()
+                               .map(s -> 
StructuredOptionsSplitter.splitEscaped(s, ':'))
+                               .map(pair -> {
+                                       if (pair.size() != 2) {
+                                               throw new 
IllegalArgumentException("Could not parse pair in the map " + pair);
+                                       }
+
+                                       return pair;
+                               })
+                               .collect(Collectors.toMap(
+                                       a -> a.get(0),
+                                       a -> a.get(1)
+                               ));
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private <E extends Enum<E>> E convertToEnum(Object o, Class<E> clazz) {
+               if (o.getClass().equals(clazz)) {
+                       return (E) o;
+               }
+
+               return Enum.valueOf(clazz, 
o.toString().toUpperCase(Locale.ROOT));
+       }
+
+       private Duration convertToDuration(Object o) {
+               if (o.getClass() == Duration.class) {
+                       return (Duration) o;
+               }
+
+               return TimeUtils.parseDuration(o.toString());
+       }
+
+       private MemorySize convertToMemorySize(Object o) {
+               if (o.getClass() == MemorySize.class) {
+                       return (MemorySize) o;
+               }
+
+               return MemorySize.parse(o.toString());
 
 Review comment:
   Hi @wuchong I think it is a valid point to consider.
   
   I did a really quick check what other libraries/frameworks do and from what 
I saw they usually support values without units. I think it is rather well 
understood what are the default units.
   
   I checked:
   - lightbend config: 
https://github.com/lightbend/config/blob/e3ec7d3490be6a446bdd173fabdaeec88b6e885e/config/src/main/java/com/typesafe/config/impl/SimpleConfig.java#L836
   - spring configuration: 
https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-external-config-conversion-duration
   - spark: 
https://spark.apache.org/docs/latest/configuration.html#spark-properties
   
   WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to