Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5132#discussion_r159779000
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
---
@@ -18,231 +18,574 @@
package org.apache.flink.table.api
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo,
TupleTypeInfo, TypeExtractor}
import org.apache.flink.api.scala._
-import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
-import org.apache.flink.table.runtime.types.CRowTypeInfo
-import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
+import org.apache.flink.table.api.TableEnvironmentTest._
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.api.scala._
+import
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{PROCTIME_INDICATOR =>
PROCTIME}
+import
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{ROWTIME_INDICATOR =>
ROWTIME}
+import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
+import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3}
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.scala.typeutils.UnitTypeInfo
import org.junit.Test
class TableEnvironmentTest extends TableTestBase {
- val tEnv = new MockTableEnvironment
-
- val tupleType = new TupleTypeInfo(
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- DOUBLE_TYPE_INFO)
-
- val rowType = new RowTypeInfo(INT_TYPE_INFO,
STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
-
- val cRowType = new CRowTypeInfo(rowType)
-
- val caseClassType: TypeInformation[CClass] =
implicitly[TypeInformation[CClass]]
-
- val pojoType: TypeInformation[PojoClass] =
TypeExtractor.createTypeInfo(classOf[PojoClass])
-
- val atomicType = INT_TYPE_INFO
-
- val genericRowType = new GenericTypeInfo[Row](classOf[Row])
+ //
----------------------------------------------------------------------------------------------
+ // schema definition by position
+ //
----------------------------------------------------------------------------------------------
@Test
- def testGetFieldInfoRow(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(rowType)
-
- fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x =>
assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testProjectByPosition(): Unit = {
+ val utils = Seq(streamTestUtil(), batchTestUtil())
+
+ utils.foreach { util =>
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b),
+ Seq("a" -> INT, "b" -> STRING))
+
+ util.verifySchema(
+ util.addTable[CClass]('a),
+ Seq("a" -> INT))
+
+ // row
+ util.verifySchema(
+ util.addTable('a, 'b, 'c)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable('a, 'b)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING))
+
+ util.verifySchema(
+ util.addTable('a)(TEST_ROW),
+ Seq("a" -> INT))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b),
+ Seq("a" -> INT, "b" -> STRING))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a),
+ Seq("a" -> INT))
+ }
}
@Test
- def testGetFieldInfoRowNames(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- rowType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2"),
- UnresolvedFieldReference("name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x =>
assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamProjectWithAddingTimeAttributesByPosition(): Unit = {
+ val util = streamTestUtil()
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c , 'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" ->
PROCTIME))
+
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime,
'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME,
"proctime" -> PROCTIME))
+
+ // row
+ util.verifySchema(
+ util.addTable('a, 'b, 'c, 'proctime.proctime)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" ->
PROCTIME))
+
+ util.verifySchema(
+ util.addTable('a, 'b, 'c, 'rowtime.rowtime)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+ util.verifySchema(
+ util.addTable('a, 'b, 'c, 'rowtime.rowtime,
'proctime.proctime)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME,
"proctime" -> PROCTIME))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c,
'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" ->
PROCTIME))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c,
'rowtime.rowtime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c,
'rowtime.rowtime, 'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME,
"proctime" -> PROCTIME))
}
@Test
- def testGetFieldInfoTuple(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(tupleType)
-
- fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x =>
assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamAliasWithReplacingTimeAttributesByPosition(): Unit = {
+ val util = streamTestUtil()
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClassWithTime]('a, 'b.rowtime, 'c),
+ Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+ util.verifySchema(
+ util.addTable[CClassWithTime]('a, ('b as 'new).rowtime, 'c),
+ Seq("a" -> INT, "new" -> ROWTIME, "c" -> STRING))
+
+ // row
+ util.verifySchema(
+ util.addTable('a, 'b.rowtime, 'c)(TEST_ROW_WITH_TIME),
+ Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+ util.verifySchema(
+ util.addTable('a, ('b as 'new).rowtime, 'c)(TEST_ROW_WITH_TIME),
+ Seq("a" -> INT, "new" -> ROWTIME, "c" -> STRING))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, Long, String]]('a, ('b as 'new).rowtime,
'c),
+ Seq("a" -> INT, "new" -> ROWTIME, "c" -> STRING))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, Long, String]]('a, ('b as 'new).rowtime,
'c),
+ Seq("a" -> INT, "new" -> ROWTIME, "c" -> STRING))
}
- @Test
- def testGetFieldInfoCClass(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(caseClassType)
+ //
----------------------------------------------------------------------------------------------
+ // schema definition by name
+ //
----------------------------------------------------------------------------------------------
- fieldInfo._1.zip(Array("cf1", "cf2", "cf3")).foreach(x =>
assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ @Test
+ def testProjectByName(): Unit = {
+ val utils = Seq(streamTestUtil(), batchTestUtil())
+
+ utils.foreach { util =>
+
+ // atomic
+ util.verifySchema(
+ util.addTable[Int](),
+ Seq("f0" -> INT))
+
+ util.verifySchema(
+ util.addTable[Int]('myint),
+ Seq("myint" -> INT))
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass](),
+ Seq("cf1" -> INT, "cf2" -> STRING, "cf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf1, 'cf2),
+ Seq("cf1" -> INT, "cf2" -> STRING))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf1, 'cf3),
+ Seq("cf1" -> INT, "cf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf3, 'cf1),
+ Seq("cf3" -> DOUBLE, "cf1" -> INT))
+
+ // row
+ util.verifySchema(
+ util.addTable()(TEST_ROW),
+ Seq("rf1" -> INT, "rf2" -> STRING, "rf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable('rf1, 'rf2)(TEST_ROW),
+ Seq("rf1" -> INT, "rf2" -> STRING))
+
+ util.verifySchema(
+ util.addTable('rf1, 'rf3)(TEST_ROW),
+ Seq("rf1" -> INT, "rf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable('rf3, 'rf1)(TEST_ROW),
+ Seq("rf3" -> DOUBLE, "rf1" -> INT))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]](),
+ Seq("f0" -> INT, "f1" -> STRING, "f2" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f0, 'f1),
+ Seq("f0" -> INT, "f1" -> STRING))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f0, 'f2),
+ Seq("f0" -> INT, "f2" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f2, 'f0),
+ Seq("f2" -> DOUBLE, "f0" -> INT))
+
+ // pojo
+ util.verifySchema(
+ util.addTable[PojoClass](),
+ Seq("pf1" -> INT, "pf2" -> STRING, "pf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf1, 'pf2),
+ Seq("pf1" -> INT, "pf2" -> STRING))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf1, 'pf3),
+ Seq("pf1" -> INT, "pf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf3, 'pf1),
+ Seq("pf3" -> DOUBLE, "pf1" -> INT))
+
+ // generic
+ util.verifySchema(
+ util.addTable[Class[_]]('mygeneric),
+ Seq("mygeneric" -> new
GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+ util.verifySchema(
+ util.addTable[Class[_]](),
+ Seq("f0" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+ // any type info
+ util.verifySchema(
+ util.addTable[Unit](),
+ Seq("f0" -> new UnitTypeInfo()))
+
+ util.verifySchema(
+ util.addTable[Unit]('unit),
+ Seq("unit" -> new UnitTypeInfo()))
+ }
}
@Test
- def testGetFieldInfoPojo(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(pojoType)
-
- fieldInfo._1.zip(Array("pf1", "pf2", "pf3")).foreach(x =>
assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamProjectWithAddingTimeAttributesByName(): Unit = {
+ val util = streamTestUtil()
+
+ // atomic
+ util.verifySchema(
+ util.addTable[Int]('proctime.proctime, 'myint),
+ Seq("proctime" -> PROCTIME, "myint" -> INT))
+
+ util.verifySchema(
+ util.addTable[Int]('rowtime.rowtime, 'myint),
+ Seq("rowtime" -> ROWTIME, "myint" -> INT))
+
+ util.verifySchema(
+ util.addTable[Int]('myint, 'proctime.proctime),
+ Seq("myint" -> INT, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[Int]('myint, 'rowtime.rowtime),
+ Seq("myint" -> INT, "rowtime" -> ROWTIME))
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass]('proctime.proctime, 'cf1, 'cf3),
+ Seq("proctime" -> PROCTIME, "cf1" -> INT, "cf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[CClass]('rowtime.rowtime, 'cf3, 'cf1),
+ Seq("rowtime" -> ROWTIME, "cf3" -> DOUBLE, "cf1" -> INT))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf1, 'proctime.proctime, 'cf3),
+ Seq("cf1" -> INT, "proctime" -> PROCTIME, "cf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf3, 'rowtime.rowtime, 'cf1),
+ Seq("cf3" -> DOUBLE, "rowtime" -> ROWTIME, "cf1" -> INT))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf1, 'cf3, 'proctime.proctime),
+ Seq("cf1" -> INT, "cf3" -> DOUBLE, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf3, 'cf1, 'rowtime.rowtime),
+ Seq("cf3" -> DOUBLE, "cf1" -> INT, "rowtime" -> ROWTIME))
+
+ // row
+ util.verifySchema(
+ util.addTable('proctime.proctime, 'rf1, 'rf3)(TEST_ROW),
+ Seq("proctime" -> PROCTIME, "rf1" -> INT, "rf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable('rowtime.rowtime, 'rf3, 'rf1)(TEST_ROW),
+ Seq("rowtime" -> ROWTIME, "rf3" -> DOUBLE, "rf1" -> INT))
+
+ util.verifySchema(
+ util.addTable('rf3, 'proctime.proctime, 'rf1)(TEST_ROW),
+ Seq("rf3" -> DOUBLE, "proctime" -> PROCTIME, "rf1" -> INT))
+
+ util.verifySchema(
+ util.addTable('rf3, 'rowtime.rowtime, 'rf1)(TEST_ROW),
+ Seq("rf3" -> DOUBLE, "rowtime" -> ROWTIME, "rf1" -> INT))
+
+ util.verifySchema(
+ util.addTable('rf3, 'rf1, 'proctime.proctime)(TEST_ROW),
+ Seq("rf3" -> DOUBLE, "rf1" -> INT, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable('rf3, 'rf1, 'rowtime.rowtime)(TEST_ROW),
+ Seq("rf3" -> DOUBLE, "rf1" -> INT, "rowtime" -> ROWTIME))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('proctime.proctime, 'f0,
'f2),
+ Seq("proctime" -> PROCTIME, "f0" -> INT, "f2" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('rowtime.rowtime, 'f2,
'f0),
+ Seq("rowtime" -> ROWTIME, "f2" -> DOUBLE, "f0" -> INT))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f0, 'proctime.proctime,
'f2),
+ Seq("f0" -> INT, "proctime" -> PROCTIME, "f2" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f2, 'rowtime.rowtime,
'f0),
+ Seq("f2" -> DOUBLE, "rowtime" -> ROWTIME, "f0" -> INT))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f0, 'f2,
'proctime.proctime),
+ Seq("f0" -> INT, "f2" -> DOUBLE, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f2, 'f0,
'rowtime.rowtime),
+ Seq("f2" -> DOUBLE, "f0" -> INT, "rowtime" -> ROWTIME))
+
+ // pojo
+ util.verifySchema(
+ util.addTable[PojoClass]('proctime.proctime, 'pf1, 'pf3),
+ Seq("proctime" -> PROCTIME, "pf1" -> INT, "pf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('rowtime.rowtime, 'pf3, 'pf1),
+ Seq("rowtime" -> ROWTIME, "pf3" -> DOUBLE, "pf1" -> INT))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf1, 'proctime.proctime, 'pf3),
+ Seq("pf1" -> INT, "proctime" -> PROCTIME, "pf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf3, 'rowtime.rowtime, 'pf1),
+ Seq("pf3" -> DOUBLE, "rowtime" -> ROWTIME, "pf1" -> INT))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf1, 'pf3, 'proctime.proctime),
+ Seq("pf1" -> INT, "pf3" -> DOUBLE, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf3, 'pf1, 'rowtime.rowtime),
+ Seq("pf3" -> DOUBLE, "pf1" -> INT, "rowtime" -> ROWTIME))
+
+ // generic
+ util.verifySchema(
+ util.addTable[Class[_]]('proctime.proctime, 'mygeneric),
+ Seq("proctime" -> PROCTIME, "mygeneric" -> new
GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+ util.verifySchema(
+ util.addTable[Class[_]]('rowtime.rowtime, 'mygeneric),
+ Seq("rowtime" -> ROWTIME, "mygeneric" -> new
GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+ util.verifySchema(
+ util.addTable[Class[_]]('mygeneric, 'proctime.proctime),
+ Seq("mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]]),
"proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[Class[_]]('mygeneric, 'rowtime.rowtime),
+ Seq("mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]]),
"rowtime" -> ROWTIME))
+
+ // any type info
+ util.verifySchema(
+ util.addTable[Unit]('proctime.proctime, 'unit),
+ Seq("proctime" -> PROCTIME, "unit" -> new UnitTypeInfo()))
+
+ util.verifySchema(
+ util.addTable[Unit]('rowtime.rowtime, 'unit),
+ Seq("rowtime" -> ROWTIME, "unit" -> new UnitTypeInfo()))
+
+ util.verifySchema(
+ util.addTable[Unit]('unit, 'proctime.proctime),
+ Seq("unit" -> new UnitTypeInfo(), "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[Unit]('unit, 'rowtime.rowtime),
+ Seq("unit" -> new UnitTypeInfo(), "rowtime" -> ROWTIME))
}
@Test
- def testGetFieldInfoAtomic(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(atomicType)
-
- fieldInfo._1.zip(Array("f0")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamProjectWithReplacingTimeAttributesByName(): Unit = {
+ val util = streamTestUtil()
+
+ // atomic
+ util.verifySchema(
+ util.addTable[Int]('myint.proctime),
+ Seq("myint" -> PROCTIME))
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClassWithTime]('cf1, 'cf3.proctime, 'cf2),
+ Seq("cf1" -> INT, "cf3" -> PROCTIME, "cf2" -> LONG))
+
+ util.verifySchema(
+ util.addTable[CClassWithTime]('cf1, 'cf3.rowtime, 'cf2),
+ Seq("cf1" -> INT, "cf3" -> ROWTIME, "cf2" -> LONG))
+
+ // row
+ util.verifySchema(
+ util.addTable('rf1, 'rf3.proctime, 'rf2)(TEST_ROW_WITH_TIME),
+ Seq("rf1" -> INT, "rf3" -> PROCTIME, "rf2" -> LONG))
+
+ util.verifySchema(
+ util.addTable('rf1, 'rf3.rowtime, 'rf2)(TEST_ROW_WITH_TIME),
+ Seq("rf1" -> INT, "rf3" -> ROWTIME, "rf2" -> LONG))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, Long, String]]('f0, 'f2.proctime, 'f1),
+ Seq("f0" -> INT, "f2" -> PROCTIME, "f1" -> LONG))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, Long, String]]('f0, 'f2.rowtime, 'f1),
+ Seq("f0" -> INT, "f2" -> ROWTIME, "f1" -> LONG))
}
@Test
- def testGetFieldInfoTupleNames(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- tupleType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2"),
- UnresolvedFieldReference("name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x =>
assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testAliasByName(): Unit = {
+ val utils = Seq(streamTestUtil(), batchTestUtil())
+
+ utils.foreach { util =>
+
+ // atomic
+ util.verifySchema(
+ util.addTable[Int]('myint as 'new),
+ Seq("new" -> INT))
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass]('cf1, 'cf3 as 'new, 'cf2),
+ Seq("cf1" -> INT, "new" -> DOUBLE, "cf2" -> STRING))
+
+ // row
+ util.verifySchema(
+ util.addTable('rf1, 'rf3 as 'new, 'rf2)(TEST_ROW),
+ Seq("rf1" -> INT, "new" -> DOUBLE, "rf2" -> STRING))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f0, 'f2 as 'new, 'f1),
+ Seq("f0" -> INT, "new" -> DOUBLE, "f1" -> STRING))
+ }
}
@Test
- def testGetFieldInfoCClassNames(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- caseClassType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2"),
- UnresolvedFieldReference("name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x =>
assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamAliasWithAddingTimeAttributesByName(): Unit = {
+ val util = streamTestUtil()
+
+ // atomic
+ util.verifySchema(
+ util.addTable[Int](('myint as 'new).proctime),
+ Seq("new" -> PROCTIME))
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClassWithTime]('cf1, ('newnew as 'new).proctime,
'cf2),
--- End diff --
I think we should add checks that `'newnew` is actually present in the
schema. For `proctime` `as` should be forbidden, IMO.
---