This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 58b8cb00baa766c19608f583601c2c24ed9213e0
Author: ifndef-SleePy <[email protected]>
AuthorDate: Fri Jun 14 22:32:30 2019 +0800

    [FLINK-12832][datastream] Use new StreamGraphGenerator configurability in 
Blink Table Runner
---
 .../flink/table/api/BatchTableEnvironment.scala    | 39 ++++++++++------------
 .../flink/table/api/StreamTableEnvironment.scala   | 12 +++++--
 2 files changed, 27 insertions(+), 24 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 8731cd7..c3c36ef 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -19,9 +19,10 @@ package org.apache.flink.table.api
 
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.jobgraph.ScheduleMode
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.environment.{CheckpointConfig, 
StreamExecutionEnvironment}
 import org.apache.flink.streaming.api.graph.{StreamGraph, StreamGraphGenerator}
 import org.apache.flink.streaming.api.transformations.StreamTransformation
 import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
@@ -36,7 +37,7 @@ import org.apache.flink.table.plan.util.{ExecNodePlanDumper, 
FlinkRelOptUtil}
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources._
 import org.apache.flink.table.util.PlanUtil
-
+import org.apache.flink.util.InstantiationUtil
 import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
 import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
 import org.apache.calcite.sql.SqlExplainLevel
@@ -98,26 +99,22 @@ class BatchTableEnvironment(
       streamingTransformations: Seq[StreamTransformation[_]],
       jobName: Option[String]): StreamGraph = {
     mergeParameters()
-    streamEnv.getConfig
-      //.enableObjectReuse() // TODO add object reuse config in table config 
for batch and stream
-      .setLatencyTrackingInterval(-1L)
-    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-    streamEnv.setBufferTimeout(-1L)
-    if (streamEnv.getCheckpointConfig.isCheckpointingEnabled) {
-      throw new TableException("Checkpoint should be disabled on Batch job.")
-    }
 
-    // TODO introduce StreamGraphGenerator#Context to support following 
features:
-    // disable all CheckpointConfig
-    // setChainingEnabled
-    // setMultiHeadChainMode
-    // setSlotSharingEnabled
-    // setScheduleMode
-    // setChainEagerlyEnabled
-
-    val streamGraph = StreamGraphGenerator.generate(streamEnv, 
streamingTransformations.toList)
-    streamGraph.setJobName(jobName.getOrElse(DEFAULT_JOB_NAME))
-    streamGraph
+    // TODO avoid cloning ExecutionConfig
+    val executionConfig = InstantiationUtil.clone(streamEnv.getConfig)
+    executionConfig.enableObjectReuse()
+    executionConfig.setLatencyTrackingInterval(-1)
+
+    new StreamGraphGenerator(streamingTransformations.toList, executionConfig, 
new CheckpointConfig)
+      .setChaining(streamEnv.isChainingEnabled)
+      .setStateBackend(streamEnv.getStateBackend)
+      .setDefaultBufferTimeout(-1)
+      .setTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+      .setUserArtifacts(streamEnv.getCachedFiles)
+      .setSlotSharingEnabled(false)
+      .setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES)
+      .setJobName(jobName.getOrElse(DEFAULT_JOB_NAME))
+      .generate()
   }
 
   override private[flink] def translateToExecNodeDag(rels: Seq[RelNode]): 
Seq[ExecNode[_, _]] = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 7af61b4..d5565fc 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -119,9 +119,15 @@ abstract class StreamTableEnvironment(
       jobName: Option[String] = None): StreamGraph = {
     mergeParameters()
 
-    val streamGraph = StreamGraphGenerator.generate(execEnv, 
streamingTransformations.toList)
-    streamGraph.setJobName(jobName.getOrElse(DEFAULT_JOB_NAME))
-    streamGraph
+    new StreamGraphGenerator(
+        streamingTransformations.toList, execEnv.getConfig, 
execEnv.getCheckpointConfig)
+      .setChaining(execEnv.isChainingEnabled)
+      .setDefaultBufferTimeout(execEnv.getBufferTimeout)
+      .setStateBackend(execEnv.getStateBackend)
+      .setTimeCharacteristic(execEnv.getStreamTimeCharacteristic)
+      .setUserArtifacts(execEnv.getCachedFiles)
+      .setJobName(jobName.getOrElse(DEFAULT_JOB_NAME))
+      .generate()
   }
 
   /**

Reply via email to