KurtYoung commented on a change in pull request #9057: [FLINK-13121] 
[table-planner-blink] Set batch properties to runtime in blink batch executor
URL: https://github.com/apache/flink/pull/9057#discussion_r302513082
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
 ##########
 @@ -42,47 +43,108 @@
 @Internal
 public class BatchExecutor extends ExecutorBase {
 
+       private boolean enableObjectReuse;
+       private long latencyTrackingInterval;
+       private long bufferTimeout;
+       private TimeCharacteristic timeCharacteristic;
+       private InputDependencyConstraint inputDependencyConstraint;
+
        @VisibleForTesting
        public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
                super(executionEnvironment);
        }
 
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
-               if (transformations.isEmpty()) {
-                       throw new TableException("No table sinks have been 
created yet. " +
-                               "A program needs at least one sink that 
consumes data. ");
-               }
                StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-               StreamGraph streamGraph = generateStreamGraph(execEnv, 
transformations, getNonEmptyJobName(jobName));
-
-               // TODO supports streamEnv.execute(streamGraph)
-               try {
-                       return execEnv.execute(getNonEmptyJobName(jobName));
-               } finally {
-                       transformations.clear();
-               }
+               StreamGraph streamGraph = generateStreamGraph(transformations, 
jobName);
+               return execEnv.execute(streamGraph);
        }
 
-       public static StreamGraph generateStreamGraph(
-               StreamExecutionEnvironment execEnv,
-               List<Transformation<?>> transformations,
-               String jobName) throws Exception {
-               // TODO avoid cloning ExecutionConfig
-               ExecutionConfig executionConfig = 
InstantiationUtil.clone(execEnv.getConfig());
+       /**
+        * Backup previous streamEnv config and set batch configs.
+        */
+       private void backupAndUpdateStreamEnv(StreamExecutionEnvironment 
execEnv) {
+               ExecutionConfig executionConfig = execEnv.getConfig();
+
+               enableObjectReuse = executionConfig.isObjectReuseEnabled();
                executionConfig.enableObjectReuse();
+
+               latencyTrackingInterval = 
executionConfig.getLatencyTrackingInterval();
                executionConfig.setLatencyTrackingInterval(-1);
 
-               return new StreamGraphGenerator(transformations, 
executionConfig, new CheckpointConfig())
-                       .setChaining(execEnv.isChainingEnabled())
-                       .setStateBackend(execEnv.getStateBackend())
-                       .setDefaultBufferTimeout(-1)
-                       
.setTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-                       .setUserArtifacts(execEnv.getCachedFiles())
-                       .setSlotSharingEnabled(false)
-                       .setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES)
-                       .setJobName(jobName)
-                       .generate();
+               timeCharacteristic = execEnv.getStreamTimeCharacteristic();
+               
execEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               bufferTimeout = execEnv.getBufferTimeout();
+               execEnv.setBufferTimeout(-1);
+
+               inputDependencyConstraint = 
executionConfig.getDefaultInputDependencyConstraint();
+               if (isShuffleModeAllBatch()) {
+                       
executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL);
+               }
        }
 
+       /**
+        * Restore previous streamEnv after execute batch jobs.
+        */
+       private void restoreStreamEnv(StreamExecutionEnvironment execEnv) {
+               ExecutionConfig executionConfig = execEnv.getConfig();
+               if (enableObjectReuse) {
+                       executionConfig.enableObjectReuse();
+               } else {
+                       executionConfig.disableObjectReuse();
+               }
+               
executionConfig.setLatencyTrackingInterval(latencyTrackingInterval);
+               execEnv.setStreamTimeCharacteristic(timeCharacteristic);
+               execEnv.setBufferTimeout(bufferTimeout);
+               if (isShuffleModeAllBatch()) {
+                       
executionConfig.setDefaultInputDependencyConstraint(inputDependencyConstraint);
+               }
+       }
+
+       /**
+        * Translates transformationList to streamGraph.
+        */
+       public StreamGraph generateStreamGraph(
+                       List<Transformation<?>> transformations,
+                       String jobName) throws Exception {
+               StreamExecutionEnvironment execEnv = getExecutionEnvironment();
+               backupAndUpdateStreamEnv(execEnv);
+               transformations.forEach(execEnv::addOperator);
+               StreamGraph streamGraph;
+               try {
+                       streamGraph = 
execEnv.getStreamGraph(getNonEmptyJobName(jobName));
+                       // If one transformation uses managed memory, all 
transformations should set managed memory to 0.
+                       ResourceSpec managedResourceSpec = 
NodeResourceUtil.fromManagedMem(0);
+                       streamGraph.getStreamNodes().forEach(sn -> {
+                               
sn.setResources(sn.getMinResources().merge(managedResourceSpec), 
sn.getPreferredResources().merge(managedResourceSpec));
+                       });
+                       streamGraph.setChaining(true);
+                       
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+                       streamGraph.setStateBackend(null);
+                       
streamGraph.getCheckpointConfig().setCheckpointInterval(Long.MAX_VALUE);
+                       if (isShuffleModeAllBatch()) {
+                               
streamGraph.setBlockingConnectionsBetweenChains(true);
+                       }
+               } finally {
+                       restoreStreamEnv(execEnv);
+               }
+               return streamGraph;
+       }
+
+       private boolean isShuffleModeAllBatch() {
+               ExecutionConfig.GlobalJobParameters parameters = 
getExecutionEnvironment().getConfig().getGlobalJobParameters();
 
 Review comment:
   Why don't you read from `TableConfig`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to