This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 2b486c5 [FLINK-23571][table-planner] Set internal query-start options
when translate ExecNodeGraph to Transformation (#16675)
2b486c5 is described below
commit 2b486c5c88e5b5013c53cc629979316314a484cb
Author: Leonard Xu <[email protected]>
AuthorDate: Tue Aug 3 00:32:01 2021 +0800
[FLINK-23571][table-planner] Set internal query-start options when
translate ExecNodeGraph to Transformation (#16675)
---
.../table/planner/delegation/BatchPlanner.scala | 5 +-
.../table/planner/delegation/StreamPlanner.scala | 6 +-
.../planner/utils/InternalConfigOptionsTest.java | 82 ++++++++++++++++++++++
3 files changed, 90 insertions(+), 3 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 0326b18..ca7c5e6 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -74,14 +74,17 @@ class BatchPlanner(
}
override protected def translateToPlan(execGraph: ExecNodeGraph):
util.List[Transformation[_]] = {
+ validateAndOverrideConfiguration()
val planner = createDummyPlanner()
- execGraph.getRootNodes.map {
+ val transformations = execGraph.getRootNodes.map {
case node: BatchExecNode[_] => node.translateToPlan(planner)
case _ =>
throw new TableException("Cannot generate BoundedStream due to an
invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
}
+ cleanupInternalConfigurations()
+ transformations
}
override def explain(operations: util.List[Operation], extraDetails:
ExplainDetail*): String = {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index c5e96b0..fe6b0cc 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -64,14 +64,16 @@ class StreamPlanner(
override protected def getExecNodeGraphProcessors:
Seq[ExecNodeGraphProcessor] = Seq()
override protected def translateToPlan(execGraph: ExecNodeGraph):
util.List[Transformation[_]] = {
+ validateAndOverrideConfiguration()
val planner = createDummyPlanner()
-
- execGraph.getRootNodes.map {
+ val transformations = execGraph.getRootNodes.map {
case node: StreamExecNode[_] => node.translateToPlan(planner)
case _ =>
throw new TableException("Cannot generate DataStream due to an invalid
logical plan. " +
"This is a bug and should not happen. Please file an issue.")
}
+ cleanupInternalConfigurations()
+ transformations
}
override def explain(operations: util.List[Operation], extraDetails:
ExplainDetail*): String = {
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
new file mode 100644
index 0000000..532a093
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+
+import org.apache.calcite.rel.RelNode;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala;
+
+/** Tests for {@link InternalConfigOptions}. */
+@RunWith(Parameterized.class)
+public class InternalConfigOptionsTest extends TableTestBase {
+
+ private TableEnvironment tEnv;
+ private PlannerBase planner;
+
+ @Parameterized.Parameters(name = "plannerMode = {0}")
+ public static Collection<String> parameters() {
+ return Arrays.asList("STREAMING", "BATCH");
+ }
+
+ @Parameterized.Parameter public String plannerMode;
+
+ @Before
+ public void setUp() {
+ if (plannerMode.equals("STREAMING")) {
+ StreamTableTestUtil util =
streamTestUtil(TableConfig.getDefault());
+ tEnv = util.getTableEnv();
+ planner = util.getPlanner();
+ } else {
+ BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault());
+ tEnv = util.getTableEnv();
+ planner = util.getPlanner();
+ }
+ }
+
+ @Test
+ public void testTranslateExecNodeGraphWithInternalTemporalConf() {
+ Table table =
+ tEnv.sqlQuery("SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_TIME,
CURRENT_TIMESTAMP");
+ RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
+ ExecNodeGraph execNodeGraph =
+
planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)));
+ // PlannerBase#translateToExecNodeGraph will set internal temporal
configurations and
+ // cleanup them after translate finished
+ List<Transformation<?>> transformation =
planner.translateToPlan(execNodeGraph);
+ // check the translation success
+ Assert.assertEquals(1, transformation.size());
+ }
+}