aljoscha commented on a change in pull request #10217:
[FLINK-14786][configuration] Add configure method to ExecutionConfig
URL: https://github.com/apache/flink/pull/10217#discussion_r347497693
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java
##########
@@ -19,35 +19,191 @@
package org.apache.flink.configuration;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.TextElement;
+import java.time.Duration;
import java.util.List;
+import java.util.Map;
import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
/**
* The {@link ConfigOption configuration options} for job execution.
*/
@PublicEvolving
public class PipelineOptions {
-
/**
* A list of jar files that contain the user-defined function (UDF)
classes and all classes used from within the UDFs.
*/
public static final ConfigOption<List<String>> JARS =
- key("pipeline.jars")
- .stringType()
- .asList()
- .noDefaultValue()
- .withDescription("A semicolon-separated
list of the jars to package with the job jars to be sent to the cluster. These
have to be valid paths.");
-
+ key("pipeline.jars")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription("A semicolon-separated list of the
jars to package with the job jars to be sent to the" +
+ " cluster. These have to be valid paths.");
/**
* A list of URLs that are added to the classpath of each user code
classloader of the program.
* Paths must specify a protocol (e.g. file://) and be accessible on
all nodes
*/
public static final ConfigOption<List<String>> CLASSPATHS =
- key("pipeline.classpaths")
- .stringType()
- .asList()
- .noDefaultValue()
- .withDescription("A semicolon-separated
list of the classpaths to package with the job jars to be sent to the cluster.
These have to be valid URLs.");
+ key("pipeline.classpaths")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription("A semicolon-separated list of the
classpaths to package with the job jars to be sent to" +
+ " the cluster. These have to be valid URLs.");
+
+ public static final ConfigOption<Boolean> AUTO_GENERATE_UIDS =
+ key("pipeline.auto-generate-uids")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(Description.builder()
+ .text(
+ "When auto-generated UIDs are disabled,
users are forced to manually specify UIDs on DataStream applications.")
+ .linebreak()
+ .linebreak()
+ .text("It is highly recommended that users
specify UIDs before deploying to" +
+ "production since they are used to
match state in savepoints to operators" +
+ "in a job. Because auto-generated ID's
are likely to change when modifying" +
+ "a job, specifying custom IDs allow an
application to evolve overtime" +
+ "without discarding state.")
+ .build());
+
+ public static final ConfigOption<Boolean> AUTO_TYPE_REGISTRATION =
+ key("pipeline.auto-type-registration")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Controls whether Flink is
automatically registering all types in the user programs with " +
+ "Kryo.");
+
+ public static final ConfigOption<Duration> AUTO_WATERMARK_INTERVAL =
+ key("pipeline.auto-watermark-interval")
+ .durationType()
+ .defaultValue(Duration.ZERO)
+ .withDescription("The interval of the automatic
watermark emission. Watermarks are used throughout " +
+ "the streaming system to keep track of the
progress of time. They are used, for example, " +
+ "for time based windowing.");
+
+ public static final ConfigOption<ExecutionConfig.ClosureCleanerLevel>
CLOSURE_CLEANER_LEVEL =
+ key("pipeline.closure-cleaner-level")
+ .enumType(ExecutionConfig.ClosureCleanerLevel.class)
+
.defaultValue(ExecutionConfig.ClosureCleanerLevel.RECURSIVE)
+ .withDescription(Description.builder()
+ .text("Configures the mode in which the closure cleaner
works")
+ .list(
+ text("NONE - disables the closure cleaner
completely"),
+ text("TOP_LEVEL - cleans only the top-level
class without recursing into fields"),
+ text("RECURSIVE - cleans all the fields
recursively")
+ )
+ .build());
+
+ public static final ConfigOption<Boolean> FORCE_AVRO =
+ key("pipeline.force-avro")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(Description.builder()
+ .text("Forces Flink to use the Apache Avro
serializer for POJOs.")
+ .linebreak()
+ .linebreak()
+ .text("Important: Make sure to include the %s
module.", code("flink-avro"))
+ .build());
+
+ public static final ConfigOption<Boolean> FORCE_KRYO =
+ key("pipeline.force-kryo")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("If enabled, forces TypeExtractor to
use Kryo serializer for POJOS even though we could " +
+ "analyze as POJO. In some cases this might be
preferable. For example, when using interfaces" +
+ "with subclasses that cannot be analyzed as
POJO.");
+
+ public static final ConfigOption<Boolean> GENERIC_TYPES =
+ key("pipeline.generic-types")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(Description.builder()
+ .text("If the use of generic types is disabled,
Flink will throw an %s whenever it encounters" +
+ "a data type that would go
through Kryo for serialization.",
+ code("UnsupportedOperationException"))
+ .linebreak()
+ .linebreak()
+ .text("Disabling generic types can be helpful
to eagerly find and eliminate the use of types" +
+ "that would go through Kryo
serialization during runtime. Rather than checking types" +
+ "individually, using this option will
throw exceptions eagerly in the places where generic" +
+ "types are used.")
+ .linebreak()
+ .linebreak()
+ .text("We recommend to use this option only
during development and pre-production" +
+ "phases, not during actual production
use. The application program and/or the input data may be" +
+ "such that new, previously unseen,
types occur at some point. In that case, setting this option" +
+ "would cause the program to fail.")
+ .build());
+
+ public static final ConfigOption<Map<String, String>>
GLOBAL_JOB_PARAMETERS =
+ key("pipeline.global-job-parameters")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Register a custom, serializable user
configuration object. The configuration can be " +
+ "accessed in operators");
+
+ public static final ConfigOption<Integer> MAX_PARALLELISM =
+ key("pipeline.max-parallelism")
+ .intType()
+ .defaultValue(-1)
+ .withDescription("The program wide maximum parallelism
used for operators which haven't specified a " +
Review comment:
I think it should be `program-wide`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services