Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5132#discussion_r159777530
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
---
@@ -18,231 +18,574 @@
package org.apache.flink.table.api
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo,
TupleTypeInfo, TypeExtractor}
import org.apache.flink.api.scala._
-import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
-import org.apache.flink.table.runtime.types.CRowTypeInfo
-import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
+import org.apache.flink.table.api.TableEnvironmentTest._
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.api.scala._
+import
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{PROCTIME_INDICATOR =>
PROCTIME}
+import
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{ROWTIME_INDICATOR =>
ROWTIME}
+import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
+import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3}
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.scala.typeutils.UnitTypeInfo
import org.junit.Test
class TableEnvironmentTest extends TableTestBase {
- val tEnv = new MockTableEnvironment
-
- val tupleType = new TupleTypeInfo(
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- DOUBLE_TYPE_INFO)
-
- val rowType = new RowTypeInfo(INT_TYPE_INFO,
STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
-
- val cRowType = new CRowTypeInfo(rowType)
-
- val caseClassType: TypeInformation[CClass] =
implicitly[TypeInformation[CClass]]
-
- val pojoType: TypeInformation[PojoClass] =
TypeExtractor.createTypeInfo(classOf[PojoClass])
-
- val atomicType = INT_TYPE_INFO
-
- val genericRowType = new GenericTypeInfo[Row](classOf[Row])
+ //
----------------------------------------------------------------------------------------------
+ // schema definition by position
+ //
----------------------------------------------------------------------------------------------
@Test
- def testGetFieldInfoRow(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(rowType)
-
- fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x =>
assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testProjectByPosition(): Unit = {
+ val utils = Seq(streamTestUtil(), batchTestUtil())
+
+ utils.foreach { util =>
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b),
+ Seq("a" -> INT, "b" -> STRING))
+
+ util.verifySchema(
+ util.addTable[CClass]('a),
+ Seq("a" -> INT))
+
+ // row
+ util.verifySchema(
+ util.addTable('a, 'b, 'c)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable('a, 'b)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING))
+
+ util.verifySchema(
+ util.addTable('a)(TEST_ROW),
+ Seq("a" -> INT))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b),
+ Seq("a" -> INT, "b" -> STRING))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a),
+ Seq("a" -> INT))
+ }
}
@Test
- def testGetFieldInfoRowNames(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- rowType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2"),
- UnresolvedFieldReference("name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x =>
assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamProjectWithAddingTimeAttributesByPosition(): Unit = {
+ val util = streamTestUtil()
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c , 'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" ->
PROCTIME))
+
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime,
'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME,
"proctime" -> PROCTIME))
+
+ // row
+ util.verifySchema(
+ util.addTable('a, 'b, 'c, 'proctime.proctime)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" ->
PROCTIME))
+
+ util.verifySchema(
+ util.addTable('a, 'b, 'c, 'rowtime.rowtime)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+ util.verifySchema(
+ util.addTable('a, 'b, 'c, 'rowtime.rowtime,
'proctime.proctime)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME,
"proctime" -> PROCTIME))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c,
'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" ->
PROCTIME))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c,
'rowtime.rowtime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c,
'rowtime.rowtime, 'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME,
"proctime" -> PROCTIME))
}
@Test
- def testGetFieldInfoTuple(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(tupleType)
-
- fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x =>
assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamAliasWithReplacingTimeAttributesByPosition(): Unit = {
+ val util = streamTestUtil()
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClassWithTime]('a, 'b.rowtime, 'c),
+ Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+ util.verifySchema(
+ util.addTable[CClassWithTime]('a, ('b as 'new).rowtime, 'c),
--- End diff --
which field does `'b` reference? We could do the same with `'new.rowtime`,
right?
---