zentol commented on code in PR #20953:
URL: https://github.com/apache/flink/pull/20953#discussion_r989016671
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java:
##########
@@ -148,11 +150,30 @@ private CompletableFuture<JobGraph> loadJobGraph(
HttpResponseStatus.BAD_REQUEST,
e));
}
+ try {
+ applyParallelismOverrides(jobGraph);
+ } catch (Exception e) {
+ throw new CompletionException(
+ new RestHandlerException(
+ "Failed to apply parallelism
overrides",
+
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ e));
+ }
return jobGraph;
},
executor);
}
+ private void applyParallelismOverrides(JobGraph jobGraph) {
+ Map<String, Integer> overrides =
configuration.get(PipelineOptions.PARALLELISM_OVERRIDES);
Review Comment:
Its rather jank that this only works in the job submit handler.
Why should it not work in application mode, jar submissions or when run
locally?
It just looks like the wrong place.
##########
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##########
@@ -165,6 +166,14 @@ public class PipelineOptions {
"Register a custom, serializable user
configuration object. The configuration can be "
+ " accessed in operators");
+ public static final ConfigOption<Map<String, Integer>>
PARALLELISM_OVERRIDES =
Review Comment:
This isn't documented.
##########
flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java:
##########
@@ -64,22 +64,37 @@
*
* <ul>
* <li>typeClass == atomic class (e.g. {@code Integer.class}) -> {@code
ConfigOption<Integer>}
- * <li>typeClass == {@code Map.class} -> {@code ConfigOption<Map<String,
String>>}
- * <li>typeClass == atomic class and isList == true for {@code
ConfigOption<List<Integer>>}
+ * <li>typeClass == atomic class and type == LIST for {@code
ConfigOption<List<Integer>>}
+ * <li>typeClass == atomic class and type == MAP for {@code
ConfigOption<Map<String,
+ * Integer>>}
* </ul>
*/
private final Class<?> clazz;
- private final boolean isList;
+ enum Type {
+ VALUE,
+ LIST,
+ MAP
+ }
Review Comment:
this should be a separate commit (and maybe ticket)
##########
flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java:
##########
@@ -160,8 +157,16 @@ public <T extends Enum<T>> TypedConfigOptionBuilder<T>
enumType(Class<T> enumCla
* Defines that the value of the option should be a set of properties,
which can be
* represented as {@code Map<String, String>}.
*/
- public TypedConfigOptionBuilder<Map<String, String>> mapType() {
- return new TypedConfigOptionBuilder<>(key, PROPERTIES_MAP_CLASS);
+ public MapConfigOptionBuilder<String> mapType() {
+ return mapType(String.class);
+ }
+
+ /**
+ * Defines that the value of the option should be a set of properties,
which can be
+ * represented as {@code Map<String, String>}.
Review Comment:
```suggestion
* represented as {@code Map<String, T>}.
```
##########
flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java:
##########
@@ -270,7 +283,33 @@ public final ConfigOption<List<E>> defaultValues(E...
values) {
* @return The config option without a default value.
*/
public ConfigOption<List<E>> noDefaultValue() {
- return new ConfigOption<>(key, clazz,
ConfigOption.EMPTY_DESCRIPTION, null, true);
+ return new ConfigOption<>(key, clazz,
ConfigOption.EMPTY_DESCRIPTION, null, LIST);
+ }
+ }
+
+ /** Builder for map type {@link ConfigOption} with a value type V. */
+ public static class MapConfigOptionBuilder<V> {
+ private final String key;
+ private final Class<V> clazz;
+
+ MapConfigOptionBuilder(String key, Class<V> clazz) {
+ this.key = key;
+ this.clazz = clazz;
+ }
+
+ /** Defines that the option's type should be a list of previously
defined atomic type. */
+ @SuppressWarnings("rawtypes")
Review Comment:
is there a particular reason you don't cast to
`ListConfigOptionBuilder<Map<String, V>>`?
##########
flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java:
##########
@@ -217,14 +222,22 @@ public ListConfigOptionBuilder<T> asList() {
return new ListConfigOptionBuilder<>(key, clazz);
}
+ /**
+ * Defines that the option's type should be a map with values of the
previously defined
+ * atomic type.
+ */
+ public MapConfigOptionBuilder<T> asMap() {
+ return new MapConfigOptionBuilder<>(key, clazz);
+ }
Review Comment:
unused (and not tested)?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java:
##########
@@ -148,11 +150,30 @@ private CompletableFuture<JobGraph> loadJobGraph(
HttpResponseStatus.BAD_REQUEST,
e));
}
+ try {
+ applyParallelismOverrides(jobGraph);
+ } catch (Exception e) {
+ throw new CompletionException(
+ new RestHandlerException(
+ "Failed to apply parallelism
overrides",
+
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ e));
+ }
return jobGraph;
},
executor);
}
+ private void applyParallelismOverrides(JobGraph jobGraph) {
+ Map<String, Integer> overrides =
configuration.get(PipelineOptions.PARALLELISM_OVERRIDES);
+ for (JobVertex vertex : jobGraph.getVertices()) {
+ Integer override = overrides.get(vertex.getID().toHexString());
+ if (override != null) {
+ vertex.setParallelism(override);
Review Comment:
These should be logged, at least on debug, as otherwise good look figuring
out where this comes from.
##########
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java:
##########
@@ -165,6 +166,14 @@ public class PipelineOptions {
"Register a custom, serializable user
configuration object. The configuration can be "
+ " accessed in operators");
+ public static final ConfigOption<Map<String, Integer>>
PARALLELISM_OVERRIDES =
+ key("pipeline.jobvertex-parallelism-overrides")
+ .mapType(Integer.class)
+ .defaultValue(Collections.emptyMap())
+ .withDescription(
+ "A parallelism override map (jobVertexId ->
parallelism) which will be used to update"
+ + " the parallelism of the corresponding
job vertices of submitted JobGraphs.");
Review Comment:
can we have a description that exposes less internals?
job graph -> application
job vertices -> operator
##########
flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java:
##########
@@ -435,6 +446,16 @@ public void testMapRemovePrefix() {
assertFalse(cfg.containsKey(MAP_PROPERTY_2));
}
+ @Test
+ public void testMapWorksWithNonStringValue() {
+ final Configuration cfg = new Configuration();
+ cfg.setString(MAP2_PROPERTY_1, "1");
+ cfg.setString(MAP2_PROPERTY_2, "2");
+
+ Map<String, Integer> stringIntegerMap =
cfg.get(MAP_OPTION_INTEGER_TYPE);
+ assertEquals(1L, (long) stringIntegerMap.get("prop1"));
Review Comment:
Can you make this a long option to be bit less weird, without the casting to
a long?
##########
flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java:
##########
@@ -160,8 +157,16 @@ public <T extends Enum<T>> TypedConfigOptionBuilder<T>
enumType(Class<T> enumCla
* Defines that the value of the option should be a set of properties,
which can be
* represented as {@code Map<String, String>}.
*/
- public TypedConfigOptionBuilder<Map<String, String>> mapType() {
- return new TypedConfigOptionBuilder<>(key, PROPERTIES_MAP_CLASS);
+ public MapConfigOptionBuilder<String> mapType() {
+ return mapType(String.class);
+ }
+
+ /**
+ * Defines that the value of the option should be a set of properties,
which can be
+ * represented as {@code Map<String, String>}.
+ */
+ public <T> MapConfigOptionBuilder<T> mapType(Class<T> valueClazz) {
Review Comment:
This should be _yet another_ commit and ticket
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]