KurtYoung commented on a change in pull request #9057: [FLINK-13121]
[table-planner-blink] Set batch properties to runtime in blink batch executor
URL: https://github.com/apache/flink/pull/9057#discussion_r302513622
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
##########
@@ -42,47 +43,108 @@
@Internal
public class BatchExecutor extends ExecutorBase {
+ private boolean enableObjectReuse;
+ private long latencyTrackingInterval;
+ private long bufferTimeout;
+ private TimeCharacteristic timeCharacteristic;
+ private InputDependencyConstraint inputDependencyConstraint;
+
@VisibleForTesting
public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
super(executionEnvironment);
}
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- if (transformations.isEmpty()) {
- throw new TableException("No table sinks have been
created yet. " +
- "A program needs at least one sink that
consumes data. ");
- }
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- StreamGraph streamGraph = generateStreamGraph(execEnv,
transformations, getNonEmptyJobName(jobName));
-
- // TODO supports streamEnv.execute(streamGraph)
- try {
- return execEnv.execute(getNonEmptyJobName(jobName));
- } finally {
- transformations.clear();
- }
+ StreamGraph streamGraph = generateStreamGraph(transformations,
jobName);
+ return execEnv.execute(streamGraph);
}
- public static StreamGraph generateStreamGraph(
- StreamExecutionEnvironment execEnv,
- List<Transformation<?>> transformations,
- String jobName) throws Exception {
- // TODO avoid cloning ExecutionConfig
- ExecutionConfig executionConfig =
InstantiationUtil.clone(execEnv.getConfig());
+ /**
+ * Backup previous streamEnv config and set batch configs.
+ */
+ private void backupAndUpdateStreamEnv(StreamExecutionEnvironment
execEnv) {
+ ExecutionConfig executionConfig = execEnv.getConfig();
+
+ enableObjectReuse = executionConfig.isObjectReuseEnabled();
executionConfig.enableObjectReuse();
+
+ latencyTrackingInterval =
executionConfig.getLatencyTrackingInterval();
executionConfig.setLatencyTrackingInterval(-1);
- return new StreamGraphGenerator(transformations,
executionConfig, new CheckpointConfig())
- .setChaining(execEnv.isChainingEnabled())
- .setStateBackend(execEnv.getStateBackend())
- .setDefaultBufferTimeout(-1)
-
.setTimeCharacteristic(TimeCharacteristic.ProcessingTime)
- .setUserArtifacts(execEnv.getCachedFiles())
- .setSlotSharingEnabled(false)
- .setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES)
- .setJobName(jobName)
- .generate();
+ timeCharacteristic = execEnv.getStreamTimeCharacteristic();
+
execEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ bufferTimeout = execEnv.getBufferTimeout();
+ execEnv.setBufferTimeout(-1);
+
+ inputDependencyConstraint =
executionConfig.getDefaultInputDependencyConstraint();
+ if (isShuffleModeAllBatch()) {
+
executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL);
+ }
}
+ /**
+ * Restore previous streamEnv after execute batch jobs.
+ */
+ private void restoreStreamEnv(StreamExecutionEnvironment execEnv) {
+ ExecutionConfig executionConfig = execEnv.getConfig();
+ if (enableObjectReuse) {
+ executionConfig.enableObjectReuse();
+ } else {
+ executionConfig.disableObjectReuse();
+ }
+
executionConfig.setLatencyTrackingInterval(latencyTrackingInterval);
+ execEnv.setStreamTimeCharacteristic(timeCharacteristic);
+ execEnv.setBufferTimeout(bufferTimeout);
+ if (isShuffleModeAllBatch()) {
+
executionConfig.setDefaultInputDependencyConstraint(inputDependencyConstraint);
+ }
+ }
+
+ /**
+ * Translates transformationList to streamGraph.
+ */
+ public StreamGraph generateStreamGraph(
+ List<Transformation<?>> transformations,
+ String jobName) throws Exception {
Review comment:
This function will not throw Exception
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services