KurtYoung commented on a change in pull request #9057: [FLINK-13121]
[table-planner-blink] Set batch properties to runtime in blink batch executor
URL: https://github.com/apache/flink/pull/9057#discussion_r301976086
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
##########
@@ -42,47 +42,105 @@
@Internal
public class BatchExecutor extends ExecutorBase {
+ private boolean enableObjectReuse;
+ private long latencyTrackingInterval;
+ private long bufferTimeout;
+ private TimeCharacteristic timeCharacteristic;
+ private InputDependencyConstraint inputDependencyConstraint;
+
@VisibleForTesting
public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
super(executionEnvironment);
}
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- if (transformations.isEmpty()) {
- throw new TableException("No table sinks have been
created yet. " +
- "A program needs at least one sink that
consumes data. ");
- }
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- StreamGraph streamGraph = generateStreamGraph(execEnv,
transformations, getNonEmptyJobName(jobName));
-
- // TODO supports streamEnv.execute(streamGraph)
- try {
- return execEnv.execute(getNonEmptyJobName(jobName));
- } finally {
- transformations.clear();
- }
+ StreamGraph streamGraph = generateStreamGraph(transformations,
jobName);
+ return execEnv.execute(streamGraph);
}
- public static StreamGraph generateStreamGraph(
- StreamExecutionEnvironment execEnv,
- List<Transformation<?>> transformations,
- String jobName) throws Exception {
- // TODO avoid cloning ExecutionConfig
- ExecutionConfig executionConfig =
InstantiationUtil.clone(execEnv.getConfig());
+ /**
+ * Backup previous streamEnv config and set batch configs.
+ */
+ private void backupAndUpdateStreamEnv(StreamExecutionEnvironment
execEnv) {
Review comment:
Why should we backup old stream env and restore it after job execution? I
think this kind of logic definitely needs some unit tests.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services