Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5132#discussion_r159777530
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 ---
    @@ -18,231 +18,574 @@
     
     package org.apache.flink.table.api
     
    -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
     import org.apache.flink.api.common.typeinfo.TypeInformation
    -import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, 
TupleTypeInfo, TypeExtractor}
     import org.apache.flink.api.scala._
    -import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
    -import org.apache.flink.table.runtime.types.CRowTypeInfo
    -import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
    +import org.apache.flink.table.api.TableEnvironmentTest._
    +import org.apache.flink.table.api.Types._
    +import org.apache.flink.table.api.scala._
    +import 
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{PROCTIME_INDICATOR => 
PROCTIME}
    +import 
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{ROWTIME_INDICATOR => 
ROWTIME}
    +import org.apache.flink.table.utils.TableTestBase
     import org.apache.flink.types.Row
    -import org.junit.Assert.assertEquals
    +import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3}
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo
    +import org.apache.flink.api.scala.typeutils.UnitTypeInfo
     import org.junit.Test
     
     class TableEnvironmentTest extends TableTestBase {
     
    -  val tEnv = new MockTableEnvironment
    -
    -  val tupleType = new TupleTypeInfo(
    -    INT_TYPE_INFO,
    -    STRING_TYPE_INFO,
    -    DOUBLE_TYPE_INFO)
    -
    -  val rowType = new RowTypeInfo(INT_TYPE_INFO, 
STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
    -
    -  val cRowType = new CRowTypeInfo(rowType)
    -
    -  val caseClassType: TypeInformation[CClass] = 
implicitly[TypeInformation[CClass]]
    -
    -  val pojoType: TypeInformation[PojoClass] = 
TypeExtractor.createTypeInfo(classOf[PojoClass])
    -
    -  val atomicType = INT_TYPE_INFO
    -
    -  val genericRowType = new GenericTypeInfo[Row](classOf[Row])
    +  // 
----------------------------------------------------------------------------------------------
    +  // schema definition by position
    +  // 
----------------------------------------------------------------------------------------------
     
       @Test
    -  def testGetFieldInfoRow(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(rowType)
    -
    -    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => 
assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testProjectByPosition(): Unit = {
    +    val utils = Seq(streamTestUtil(), batchTestUtil())
    +
    +    utils.foreach { util =>
    +
    +      // case class
    +      util.verifySchema(
    +        util.addTable[CClass]('a, 'b, 'c),
    +        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[CClass]('a, 'b),
    +        Seq("a" -> INT, "b" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable[CClass]('a),
    +        Seq("a" -> INT))
    +
    +      // row
    +      util.verifySchema(
    +        util.addTable('a, 'b, 'c)(TEST_ROW),
    +        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable('a, 'b)(TEST_ROW),
    +        Seq("a" -> INT, "b" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable('a)(TEST_ROW),
    +        Seq("a" -> INT))
    +
    +      // tuple
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c),
    +        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('a, 'b),
    +        Seq("a" -> INT, "b" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('a),
    +        Seq("a" -> INT))
    +    }
       }
     
       @Test
    -  def testGetFieldInfoRowNames(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(
    -      rowType,
    -      Array(
    -        UnresolvedFieldReference("name1"),
    -        UnresolvedFieldReference("name2"),
    -        UnresolvedFieldReference("name3")
    -      ))
    -
    -    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => 
assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testStreamProjectWithAddingTimeAttributesByPosition(): Unit = {
    +    val util = streamTestUtil()
    +
    +    // case class
    +    util.verifySchema(
    +      util.addTable[CClass]('a, 'b, 'c , 'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> 
PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
    +
    +    util.verifySchema(
    +      util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime, 
'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, 
"proctime" -> PROCTIME))
    +
    +    // row
    +    util.verifySchema(
    +      util.addTable('a, 'b, 'c, 'proctime.proctime)(TEST_ROW),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> 
PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable('a, 'b, 'c, 'rowtime.rowtime)(TEST_ROW),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
    +
    +    util.verifySchema(
    +      util.addTable('a, 'b, 'c, 'rowtime.rowtime, 
'proctime.proctime)(TEST_ROW),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, 
"proctime" -> PROCTIME))
    +
    +    // tuple
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 
'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> 
PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 
'rowtime.rowtime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 
'rowtime.rowtime, 'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, 
"proctime" -> PROCTIME))
       }
     
       @Test
    -  def testGetFieldInfoTuple(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(tupleType)
    -
    -    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => 
assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testStreamAliasWithReplacingTimeAttributesByPosition(): Unit = {
    +    val util = streamTestUtil()
    +
    +    // case class
    +    util.verifySchema(
    +      util.addTable[CClassWithTime]('a, 'b.rowtime, 'c),
    +      Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
    +
    +    util.verifySchema(
    +      util.addTable[CClassWithTime]('a, ('b as 'new).rowtime, 'c),
    --- End diff --
    
    which field does `'b` reference? We could do the same with `'new.rowtime`, 
right?


---

Reply via email to