This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 58b8cb00baa766c19608f583601c2c24ed9213e0 Author: ifndef-SleePy <[email protected]> AuthorDate: Fri Jun 14 22:32:30 2019 +0800 [FLINK-12832][datastream] Use new StreamGraphGenerator configurability in Blink Table Runner --- .../flink/table/api/BatchTableEnvironment.scala | 39 ++++++++++------------ .../flink/table/api/StreamTableEnvironment.scala | 12 +++++-- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 8731cd7..c3c36ef 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -19,9 +19,10 @@ package org.apache.flink.table.api import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.jobgraph.ScheduleMode import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment} import org.apache.flink.streaming.api.graph.{StreamGraph, StreamGraphGenerator} import org.apache.flink.streaming.api.transformations.StreamTransformation import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef @@ -36,7 +37,7 @@ import org.apache.flink.table.plan.util.{ExecNodePlanDumper, FlinkRelOptUtil} import org.apache.flink.table.sinks._ import org.apache.flink.table.sources._ import org.apache.flink.table.util.PlanUtil - +import org.apache.flink.util.InstantiationUtil import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef} import org.apache.calcite.rel.{RelCollationTraitDef, RelNode} import org.apache.calcite.sql.SqlExplainLevel @@ -98,26 +99,22 @@ class BatchTableEnvironment( streamingTransformations: Seq[StreamTransformation[_]], jobName: Option[String]): StreamGraph = { mergeParameters() - streamEnv.getConfig - //.enableObjectReuse() // TODO add object reuse config in table config for batch and stream - .setLatencyTrackingInterval(-1L) - streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) - streamEnv.setBufferTimeout(-1L) - if (streamEnv.getCheckpointConfig.isCheckpointingEnabled) { - throw new TableException("Checkpoint should be disabled on Batch job.") - } - // TODO introduce StreamGraphGenerator#Context to support following features: - // disable all CheckpointConfig - // setChainingEnabled - // setMultiHeadChainMode - // setSlotSharingEnabled - // setScheduleMode - // setChainEagerlyEnabled - - val streamGraph = StreamGraphGenerator.generate(streamEnv, streamingTransformations.toList) - streamGraph.setJobName(jobName.getOrElse(DEFAULT_JOB_NAME)) - streamGraph + // TODO avoid cloning ExecutionConfig + val executionConfig = InstantiationUtil.clone(streamEnv.getConfig) + executionConfig.enableObjectReuse() + executionConfig.setLatencyTrackingInterval(-1) + + new StreamGraphGenerator(streamingTransformations.toList, executionConfig, new CheckpointConfig) + .setChaining(streamEnv.isChainingEnabled) + .setStateBackend(streamEnv.getStateBackend) + .setDefaultBufferTimeout(-1) + .setTimeCharacteristic(TimeCharacteristic.ProcessingTime) + .setUserArtifacts(streamEnv.getCachedFiles) + .setSlotSharingEnabled(false) + .setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES) + .setJobName(jobName.getOrElse(DEFAULT_JOB_NAME)) + .generate() } override private[flink] def translateToExecNodeDag(rels: Seq[RelNode]): Seq[ExecNode[_, _]] = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 7af61b4..d5565fc 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -119,9 +119,15 @@ abstract class StreamTableEnvironment( jobName: Option[String] = None): StreamGraph = { mergeParameters() - val streamGraph = StreamGraphGenerator.generate(execEnv, streamingTransformations.toList) - streamGraph.setJobName(jobName.getOrElse(DEFAULT_JOB_NAME)) - streamGraph + new StreamGraphGenerator( + streamingTransformations.toList, execEnv.getConfig, execEnv.getCheckpointConfig) + .setChaining(execEnv.isChainingEnabled) + .setDefaultBufferTimeout(execEnv.getBufferTimeout) + .setStateBackend(execEnv.getStateBackend) + .setTimeCharacteristic(execEnv.getStreamTimeCharacteristic) + .setUserArtifacts(execEnv.getCachedFiles) + .setJobName(jobName.getOrElse(DEFAULT_JOB_NAME)) + .generate() } /**
