wuchong commented on a change in pull request #9976: [FLINK-14493][core]
Introduce data types to ConfigOptions.
URL: https://github.com/apache/flink/pull/9976#discussion_r339425139
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
##########
@@ -850,99 +856,173 @@ private void loggingFallback(FallbackKey fallbackKey,
ConfigOption<?> configOpti
// Type conversion
//
--------------------------------------------------------------------------------------------
- private int convertToInt(Object o, int defaultValue) {
+ @SuppressWarnings("unchecked")
+ private <T> T convertValue(Object rawValue, Class clazz) {
+ if (Integer.class.equals(clazz)) {
+ return (T) convertToInt(rawValue);
+ } else if (Long.class.equals(clazz)) {
+ return (T) convertToLong(rawValue);
+ } else if (Boolean.class.equals(clazz)) {
+ return (T) convertToBoolean(rawValue);
+ } else if (Float.class.equals(clazz)) {
+ return (T) convertToFloat(rawValue);
+ } else if (Double.class.equals(clazz)) {
+ return (T) convertToDouble(rawValue);
+ } else if (String.class.equals(clazz)) {
+ return (T) convertToString(rawValue);
+ } else if (clazz.isEnum()) {
+ return (T) convertToEnum(rawValue, clazz);
+ } else if (clazz == Duration.class) {
+ return (T) convertToDuration(rawValue);
+ } else if (clazz == MemorySize.class) {
+ return (T) convertToMemorySize(rawValue);
+ } else if (clazz == Map.class) {
+ return (T) convertToProperties(rawValue);
+ }
+
+ throw new IllegalArgumentException("Unsupported type: " +
clazz);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T convertToList(Object rawValue, Class atomicClass) {
+ if (rawValue instanceof List) {
+ return (T) rawValue;
+ } else {
+ return (T)
StructuredOptionsSplitter.splitEscaped(rawValue.toString(), ';').stream()
+ .map(s -> convertValue(s, atomicClass))
+ .collect(Collectors.toList());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<String, String> convertToProperties(Object o) {
+ if (o instanceof Map) {
+ return (Map<String, String>) o;
+ } else {
+ List<String> listOfRawProperties =
StructuredOptionsSplitter.splitEscaped(o.toString(), ',');
+ return listOfRawProperties.stream()
+ .map(s ->
StructuredOptionsSplitter.splitEscaped(s, ':'))
+ .map(pair -> {
+ if (pair.size() != 2) {
+ throw new
IllegalArgumentException("Could not parse pair in the map " + pair);
+ }
+
+ return pair;
+ })
+ .collect(Collectors.toMap(
+ a -> a.get(0),
+ a -> a.get(1)
+ ));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <E extends Enum<E>> E convertToEnum(Object o, Class<E> clazz) {
+ if (o.getClass().equals(clazz)) {
+ return (E) o;
+ }
+
+ return Enum.valueOf(clazz,
o.toString().toUpperCase(Locale.ROOT));
+ }
+
+ private Duration convertToDuration(Object o) {
+ if (o.getClass() == Duration.class) {
+ return (Duration) o;
+ }
+
+ return TimeUtils.parseDuration(o.toString());
+ }
+
+ private MemorySize convertToMemorySize(Object o) {
+ if (o.getClass() == MemorySize.class) {
+ return (MemorySize) o;
+ }
+
+ return MemorySize.parse(o.toString());
Review comment:
Currently, `MemorySize.parse` alllows to parse a number string without unit,
e.g. `1234` is parsed as 1234 bytes.
It means string without unit is allowed in every MemorySize options,
however, string without unit is confusing, and may should be forbidden.
Duration is the same.
What do you think about it? Shall we restrict it here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services