This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 399de8b [FLINK-13117][table-planner-blink] Do not need to backup and
restore streamEnv config in BatchExecutor
399de8b is described below
commit 399de8b7d43094561efef185989ba714c2994f95
Author: Xupingyong <[email protected]>
AuthorDate: Fri Jul 19 16:39:53 2019 +0800
[FLINK-13117][table-planner-blink] Do not need to backup and restore
streamEnv config in BatchExecutor
This closes #9179
---
.../apache/flink/table/executor/BatchExecutor.java | 55 ++-------------
.../flink/table/executor/BatchExecutorTest.scala | 81 ----------------------
2 files changed, 6 insertions(+), 130 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index adafab1..0cf5169 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -43,8 +43,6 @@ import java.util.List;
@Internal
public class BatchExecutor extends ExecutorBase {
- private BatchExecEnvConfig batchExecEnvConfig = new
BatchExecEnvConfig();
-
@VisibleForTesting
public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
super(executionEnvironment);
@@ -58,10 +56,9 @@ public class BatchExecutor extends ExecutorBase {
}
/**
- * Backup previous streamEnv config and set batch configs.
+ * Sets batch configs.
*/
- private void backupAndUpdateStreamEnv(StreamExecutionEnvironment
execEnv) {
- batchExecEnvConfig.backup(execEnv);
+ private void setBatchProperties(StreamExecutionEnvironment execEnv) {
ExecutionConfig executionConfig = execEnv.getConfig();
executionConfig.enableObjectReuse();
executionConfig.setLatencyTrackingInterval(-1);
@@ -77,7 +74,7 @@ public class BatchExecutor extends ExecutorBase {
*/
public StreamGraph generateStreamGraph(List<Transformation<?>>
transformations, String jobName) {
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- backupAndUpdateStreamEnv(execEnv);
+ setBatchProperties(execEnv);
transformations.forEach(execEnv::addOperator);
StreamGraph streamGraph;
streamGraph =
execEnv.getStreamGraph(getNonEmptyJobName(jobName));
@@ -91,11 +88,12 @@ public class BatchExecutor extends ExecutorBase {
streamGraph.setChaining(true);
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
streamGraph.setStateBackend(null);
-
streamGraph.getCheckpointConfig().setCheckpointInterval(Long.MAX_VALUE);
+ if (streamGraph.getCheckpointConfig().isCheckpointingEnabled())
{
+ throw new IllegalArgumentException("Checkpoint is not
supported for batch jobs.");
+ }
if (isShuffleModeAllBatch()) {
streamGraph.setBlockingConnectionsBetweenChains(true);
}
- batchExecEnvConfig.restore(execEnv);
return streamGraph;
}
@@ -109,45 +107,4 @@ public class BatchExecutor extends ExecutorBase {
}
return false;
}
-
- /**
- * Batch configs that are set in {@link StreamExecutionEnvironment}. We
should backup and change
- * these configs and restore finally.
- */
- private static class BatchExecEnvConfig {
-
- private boolean enableObjectReuse;
- private long latencyTrackingInterval;
- private long bufferTimeout;
- private TimeCharacteristic timeCharacteristic;
- private InputDependencyConstraint inputDependencyConstraint;
-
- /**
- * Backup previous streamEnv config.
- */
- public void backup(StreamExecutionEnvironment execEnv) {
- ExecutionConfig executionConfig = execEnv.getConfig();
- enableObjectReuse =
executionConfig.isObjectReuseEnabled();
- latencyTrackingInterval =
executionConfig.getLatencyTrackingInterval();
- timeCharacteristic =
execEnv.getStreamTimeCharacteristic();
- bufferTimeout = execEnv.getBufferTimeout();
- inputDependencyConstraint =
executionConfig.getDefaultInputDependencyConstraint();
- }
-
- /**
- * Restore previous streamEnv after execute batch jobs.
- */
- public void restore(StreamExecutionEnvironment execEnv) {
- ExecutionConfig executionConfig = execEnv.getConfig();
- if (enableObjectReuse) {
- executionConfig.enableObjectReuse();
- } else {
- executionConfig.disableObjectReuse();
- }
-
executionConfig.setLatencyTrackingInterval(latencyTrackingInterval);
- execEnv.setStreamTimeCharacteristic(timeCharacteristic);
- execEnv.setBufferTimeout(bufferTimeout);
-
executionConfig.setDefaultInputDependencyConstraint(inputDependencyConstraint);
- }
- }
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
deleted file mode 100644
index cca8e1a..0000000
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.executor
-
-import org.apache.flink.api.common.InputDependencyConstraint
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.transformations.ShuffleMode
-import org.apache.flink.table.api.ExecutionConfigOptions
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.util.{BatchTableTestUtil, TableTestBase}
-
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Before, Test}
-
-/**
- * Test for streamEnv config save and restore when run batch jobs.
- */
-class BatchExecutorTest extends TableTestBase {
-
- private var util: BatchTableTestUtil = _
-
- @Before
- def setUp(): Unit = {
- util = batchTestUtil()
- util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
- util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
- }
-
- @Test
- def testRestoreConfig(): Unit = {
- util.getStreamEnv.setBufferTimeout(11)
- util.getStreamEnv.getConfig.disableObjectReuse()
- util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
- util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
- util.verifyExplain("SELECT * FROM MyTable")
- assertEquals(11, util.getStreamEnv.getBufferTimeout)
- assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
- assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
- assertEquals(TimeCharacteristic.EventTime,
util.getStreamEnv.getStreamTimeCharacteristic)
- assertEquals(InputDependencyConstraint.ANY,
- util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
- }
-
- @Test
- def testRestoreConfigWhenBatchShuffleMode(): Unit = {
- util.getTableEnv.getConfig.getConfiguration.setString(
- ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE,
- ShuffleMode.BATCH.toString)
- util.getStreamEnv.setBufferTimeout(11)
- util.getStreamEnv.getConfig.disableObjectReuse()
- util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
- util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
- util.verifyExplain("SELECT * FROM MyTable")
- assertEquals(11, util.getStreamEnv.getBufferTimeout)
- assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
- assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
- assertEquals(TimeCharacteristic.EventTime,
util.getStreamEnv.getStreamTimeCharacteristic)
- assertEquals(InputDependencyConstraint.ANY,
- util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
- }
-}