[
https://issues.apache.org/jira/browse/FLINK-6483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004575#comment-16004575
]
ASF GitHub Bot commented on FLINK-6483:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3862#discussion_r115719405
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
---
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import
org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
+import org.apache.flink.table.expressions.{TimeIntervalUnit,
WindowReference}
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.plan.logical.TumblingGroupWindow
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+ * Tests for [[RelTimeIndicatorConverter]].
+ */
+class RelTimeIndicatorConverterTest extends TableTestBase {
+
+ @Test
+ def testSimpleMaterialization(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long,
'int, 'proctime.proctime)
+
+ val result = t
+ .select('rowtime.floor(TimeIntervalUnit.DAY) as 'rowtime, 'long)
+ .filter('long > 0)
+ .select('rowtime)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS
rowtime"),
+ term("where", ">(long, 0)")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testSelectAll(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long,
'int, 'proctime.proctime)
+
+ val result = t.select('*)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long",
"int",
+ "TIME_MATERIALIZATION(proctime) AS proctime")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testFilteringOnRowtime(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+ val result = t
+ .filter('rowtime > "1990-12-02 12:11:11".toTimestamp)
+ .select('rowtime)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"),
+ term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02
12:11:11)")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupingOnRowtime(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long,
'int, 'proctime.proctime)
+
+ val result = t
+ .groupBy('rowtime)
+ .select('long.count)
+
+ util.verifyTable(result, "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupingOnProctimeSql(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
+
+ val result = util.tEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY
proctime")
+
+ util.verifyTable(result, "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testAggregationOnRowtime(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+ val result = t
+ .groupBy('long)
+ .select('rowtime.count)
+
+ util.verifyTable(result, "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testAggregationOnProctimeSql(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
+
+ val result = util.tEnv.sql("SELECT COUNT(proctime) FROM MyTable GROUP
BY long")
+
+ util.verifyTable(result, "FAIL")
+ }
+
+ @Test
+ def testTableFunction(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long,
'int, 'proctime.proctime)
+ val func = new TableFunc
+
+ val result = t.join(func('rowtime, 'proctime) as 's).select('rowtime,
'proctime, 's)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation",
+ s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0),
TIME_MATERIALIZATION($$3))"),
+ term("function", func),
+ term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long,
INTEGER int, " +
+ "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
+ term("joinType", "INNER")
+ ),
+ term("select",
+ "TIME_MATERIALIZATION(rowtime) AS rowtime",
+ "TIME_MATERIALIZATION(proctime) AS proctime",
+ "s")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWindowGroupOnRowtime(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+ val result = t
+ .window(Tumble over 100.millis on 'rowtime as 'w)
+ .groupBy('w, 'rowtime)
+ .select('w.start, 'rowtime, 'int.sum)
+
+ util.verifyTable(result, "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWindowAggregationOnRowtime(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+ val result = t
+ .window(Tumble over 100.millis on 'rowtime as 'w)
+ .groupBy('w, 'long)
+ .select('w.start, 'long, 'rowtime.count)
+
+ util.verifyTable(result, "FAIL")
+ }
+
+ @Test
+ def testWindowStartEnd(): Unit = {
--- End diff --
tests only window end.
> Support time materialization
> ----------------------------
>
> Key: FLINK-6483
> URL: https://issues.apache.org/jira/browse/FLINK-6483
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> FLINK-5884 added support for time indicators. However, there are still some
> features missing i.e. materialization of metadata timestamp.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)