This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 2b486c5  [FLINK-23571][table-planner] Set internal query-start options 
when translate ExecNodeGraph to Transformation (#16675)
2b486c5 is described below

commit 2b486c5c88e5b5013c53cc629979316314a484cb
Author: Leonard Xu <[email protected]>
AuthorDate: Tue Aug 3 00:32:01 2021 +0800

    [FLINK-23571][table-planner] Set internal query-start options when 
translate ExecNodeGraph to Transformation (#16675)
---
 .../table/planner/delegation/BatchPlanner.scala    |  5 +-
 .../table/planner/delegation/StreamPlanner.scala   |  6 +-
 .../planner/utils/InternalConfigOptionsTest.java   | 82 ++++++++++++++++++++++
 3 files changed, 90 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 0326b18..ca7c5e6 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -74,14 +74,17 @@ class BatchPlanner(
   }
 
   override protected def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]] = {
+    validateAndOverrideConfiguration()
     val planner = createDummyPlanner()
 
-    execGraph.getRootNodes.map {
+    val transformations = execGraph.getRootNodes.map {
       case node: BatchExecNode[_] => node.translateToPlan(planner)
       case _ =>
         throw new TableException("Cannot generate BoundedStream due to an 
invalid logical plan. " +
             "This is a bug and should not happen. Please file an issue.")
     }
+    cleanupInternalConfigurations()
+    transformations
   }
 
   override def explain(operations: util.List[Operation], extraDetails: 
ExplainDetail*): String = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index c5e96b0..fe6b0cc 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -64,14 +64,16 @@ class StreamPlanner(
   override protected def getExecNodeGraphProcessors: 
Seq[ExecNodeGraphProcessor] = Seq()
 
   override protected def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]] = {
+    validateAndOverrideConfiguration()
     val planner = createDummyPlanner()
-
-    execGraph.getRootNodes.map {
+    val transformations = execGraph.getRootNodes.map {
       case node: StreamExecNode[_] => node.translateToPlan(planner)
       case _ =>
         throw new TableException("Cannot generate DataStream due to an invalid 
logical plan. " +
           "This is a bug and should not happen. Please file an issue.")
     }
+    cleanupInternalConfigurations()
+    transformations
   }
 
   override def explain(operations: util.List[Operation], extraDetails: 
ExplainDetail*): String = {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
new file mode 100644
index 0000000..532a093
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+
+import org.apache.calcite.rel.RelNode;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala;
+
+/** Tests for {@link InternalConfigOptions}. */
+@RunWith(Parameterized.class)
+public class InternalConfigOptionsTest extends TableTestBase {
+
+    private TableEnvironment tEnv;
+    private PlannerBase planner;
+
+    @Parameterized.Parameters(name = "plannerMode = {0}")
+    public static Collection<String> parameters() {
+        return Arrays.asList("STREAMING", "BATCH");
+    }
+
+    @Parameterized.Parameter public String plannerMode;
+
+    @Before
+    public void setUp() {
+        if (plannerMode.equals("STREAMING")) {
+            StreamTableTestUtil util = 
streamTestUtil(TableConfig.getDefault());
+            tEnv = util.getTableEnv();
+            planner = util.getPlanner();
+        } else {
+            BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault());
+            tEnv = util.getTableEnv();
+            planner = util.getPlanner();
+        }
+    }
+
+    @Test
+    public void testTranslateExecNodeGraphWithInternalTemporalConf() {
+        Table table =
+                tEnv.sqlQuery("SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_TIME, 
CURRENT_TIMESTAMP");
+        RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
+        ExecNodeGraph execNodeGraph =
+                
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)));
+        // PlannerBase#translateToExecNodeGraph will set internal temporal 
configurations and
+        // cleanup them after translate finished
+        List<Transformation<?>> transformation = 
planner.translateToPlan(execNodeGraph);
+        // check the translation success
+        Assert.assertEquals(1, transformation.size());
+    }
+}

Reply via email to