This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 399de8b  [FLINK-13117][table-planner-blink] Do not need to backup and 
restore streamEnv config in BatchExecutor
399de8b is described below

commit 399de8b7d43094561efef185989ba714c2994f95
Author: Xupingyong <[email protected]>
AuthorDate: Fri Jul 19 16:39:53 2019 +0800

    [FLINK-13117][table-planner-blink] Do not need to backup and restore 
streamEnv config in BatchExecutor
    
    This closes #9179
---
 .../apache/flink/table/executor/BatchExecutor.java | 55 ++-------------
 .../flink/table/executor/BatchExecutorTest.scala   | 81 ----------------------
 2 files changed, 6 insertions(+), 130 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index adafab1..0cf5169 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -43,8 +43,6 @@ import java.util.List;
 @Internal
 public class BatchExecutor extends ExecutorBase {
 
-       private BatchExecEnvConfig batchExecEnvConfig = new 
BatchExecEnvConfig();
-
        @VisibleForTesting
        public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
                super(executionEnvironment);
@@ -58,10 +56,9 @@ public class BatchExecutor extends ExecutorBase {
        }
 
        /**
-        * Backup previous streamEnv config and set batch configs.
+        * Sets batch configs.
         */
-       private void backupAndUpdateStreamEnv(StreamExecutionEnvironment 
execEnv) {
-               batchExecEnvConfig.backup(execEnv);
+       private void setBatchProperties(StreamExecutionEnvironment execEnv) {
                ExecutionConfig executionConfig = execEnv.getConfig();
                executionConfig.enableObjectReuse();
                executionConfig.setLatencyTrackingInterval(-1);
@@ -77,7 +74,7 @@ public class BatchExecutor extends ExecutorBase {
         */
        public StreamGraph generateStreamGraph(List<Transformation<?>> 
transformations, String jobName) {
                StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-               backupAndUpdateStreamEnv(execEnv);
+               setBatchProperties(execEnv);
                transformations.forEach(execEnv::addOperator);
                StreamGraph streamGraph;
                streamGraph = 
execEnv.getStreamGraph(getNonEmptyJobName(jobName));
@@ -91,11 +88,12 @@ public class BatchExecutor extends ExecutorBase {
                streamGraph.setChaining(true);
                
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
                streamGraph.setStateBackend(null);
-               
streamGraph.getCheckpointConfig().setCheckpointInterval(Long.MAX_VALUE);
+               if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) 
{
+                       throw new IllegalArgumentException("Checkpoint is not 
supported for batch jobs.");
+               }
                if (isShuffleModeAllBatch()) {
                        streamGraph.setBlockingConnectionsBetweenChains(true);
                }
-               batchExecEnvConfig.restore(execEnv);
                return streamGraph;
        }
 
@@ -109,45 +107,4 @@ public class BatchExecutor extends ExecutorBase {
                }
                return false;
        }
-
-       /**
-        * Batch configs that are set in {@link StreamExecutionEnvironment}. We 
should backup and change
-        * these configs and restore finally.
-        */
-       private static class BatchExecEnvConfig {
-
-               private boolean enableObjectReuse;
-               private long latencyTrackingInterval;
-               private long bufferTimeout;
-               private TimeCharacteristic timeCharacteristic;
-               private InputDependencyConstraint inputDependencyConstraint;
-
-               /**
-                * Backup previous streamEnv config.
-                */
-               public void backup(StreamExecutionEnvironment execEnv) {
-                       ExecutionConfig executionConfig = execEnv.getConfig();
-                       enableObjectReuse = 
executionConfig.isObjectReuseEnabled();
-                       latencyTrackingInterval = 
executionConfig.getLatencyTrackingInterval();
-                       timeCharacteristic = 
execEnv.getStreamTimeCharacteristic();
-                       bufferTimeout = execEnv.getBufferTimeout();
-                       inputDependencyConstraint = 
executionConfig.getDefaultInputDependencyConstraint();
-               }
-
-               /**
-                * Restore previous streamEnv after execute batch jobs.
-                */
-               public void restore(StreamExecutionEnvironment execEnv) {
-                       ExecutionConfig executionConfig = execEnv.getConfig();
-                       if (enableObjectReuse) {
-                               executionConfig.enableObjectReuse();
-                       } else {
-                               executionConfig.disableObjectReuse();
-                       }
-                       
executionConfig.setLatencyTrackingInterval(latencyTrackingInterval);
-                       execEnv.setStreamTimeCharacteristic(timeCharacteristic);
-                       execEnv.setBufferTimeout(bufferTimeout);
-                       
executionConfig.setDefaultInputDependencyConstraint(inputDependencyConstraint);
-               }
-       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
deleted file mode 100644
index cca8e1a..0000000
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.executor
-
-import org.apache.flink.api.common.InputDependencyConstraint
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.transformations.ShuffleMode
-import org.apache.flink.table.api.ExecutionConfigOptions
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.util.{BatchTableTestUtil, TableTestBase}
-
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Before, Test}
-
-/**
-  * Test for streamEnv config save and restore when run batch jobs.
-  */
-class BatchExecutorTest extends TableTestBase {
-
-  private var util: BatchTableTestUtil = _
-
-  @Before
-  def setUp(): Unit = {
-    util = batchTestUtil()
-    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
-    util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
-  }
-
-  @Test
-  def testRestoreConfig(): Unit = {
-    util.getStreamEnv.setBufferTimeout(11)
-    util.getStreamEnv.getConfig.disableObjectReuse()
-    util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
-    util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    
util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
-    util.verifyExplain("SELECT * FROM MyTable")
-    assertEquals(11, util.getStreamEnv.getBufferTimeout)
-    assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
-    assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
-    assertEquals(TimeCharacteristic.EventTime, 
util.getStreamEnv.getStreamTimeCharacteristic)
-    assertEquals(InputDependencyConstraint.ANY,
-      util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
-  }
-
-  @Test
-  def testRestoreConfigWhenBatchShuffleMode(): Unit = {
-    util.getTableEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE,
-      ShuffleMode.BATCH.toString)
-    util.getStreamEnv.setBufferTimeout(11)
-    util.getStreamEnv.getConfig.disableObjectReuse()
-    util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
-    util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    
util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
-    util.verifyExplain("SELECT * FROM MyTable")
-    assertEquals(11, util.getStreamEnv.getBufferTimeout)
-    assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
-    assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
-    assertEquals(TimeCharacteristic.EventTime, 
util.getStreamEnv.getStreamTimeCharacteristic)
-    assertEquals(InputDependencyConstraint.ANY,
-      util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
-  }
-}

Reply via email to