[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16312189#comment-16312189
 ] 

ASF GitHub Bot commented on FLINK-8203:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5132#discussion_r159777530
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 ---
    @@ -18,231 +18,574 @@
     
     package org.apache.flink.table.api
     
    -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
     import org.apache.flink.api.common.typeinfo.TypeInformation
    -import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, 
TupleTypeInfo, TypeExtractor}
     import org.apache.flink.api.scala._
    -import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
    -import org.apache.flink.table.runtime.types.CRowTypeInfo
    -import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
    +import org.apache.flink.table.api.TableEnvironmentTest._
    +import org.apache.flink.table.api.Types._
    +import org.apache.flink.table.api.scala._
    +import 
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{PROCTIME_INDICATOR => 
PROCTIME}
    +import 
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{ROWTIME_INDICATOR => 
ROWTIME}
    +import org.apache.flink.table.utils.TableTestBase
     import org.apache.flink.types.Row
    -import org.junit.Assert.assertEquals
    +import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3}
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo
    +import org.apache.flink.api.scala.typeutils.UnitTypeInfo
     import org.junit.Test
     
     class TableEnvironmentTest extends TableTestBase {
     
    -  val tEnv = new MockTableEnvironment
    -
    -  val tupleType = new TupleTypeInfo(
    -    INT_TYPE_INFO,
    -    STRING_TYPE_INFO,
    -    DOUBLE_TYPE_INFO)
    -
    -  val rowType = new RowTypeInfo(INT_TYPE_INFO, 
STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
    -
    -  val cRowType = new CRowTypeInfo(rowType)
    -
    -  val caseClassType: TypeInformation[CClass] = 
implicitly[TypeInformation[CClass]]
    -
    -  val pojoType: TypeInformation[PojoClass] = 
TypeExtractor.createTypeInfo(classOf[PojoClass])
    -
    -  val atomicType = INT_TYPE_INFO
    -
    -  val genericRowType = new GenericTypeInfo[Row](classOf[Row])
    +  // 
----------------------------------------------------------------------------------------------
    +  // schema definition by position
    +  // 
----------------------------------------------------------------------------------------------
     
       @Test
    -  def testGetFieldInfoRow(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(rowType)
    -
    -    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => 
assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testProjectByPosition(): Unit = {
    +    val utils = Seq(streamTestUtil(), batchTestUtil())
    +
    +    utils.foreach { util =>
    +
    +      // case class
    +      util.verifySchema(
    +        util.addTable[CClass]('a, 'b, 'c),
    +        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[CClass]('a, 'b),
    +        Seq("a" -> INT, "b" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable[CClass]('a),
    +        Seq("a" -> INT))
    +
    +      // row
    +      util.verifySchema(
    +        util.addTable('a, 'b, 'c)(TEST_ROW),
    +        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable('a, 'b)(TEST_ROW),
    +        Seq("a" -> INT, "b" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable('a)(TEST_ROW),
    +        Seq("a" -> INT))
    +
    +      // tuple
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c),
    +        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
    +
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('a, 'b),
    +        Seq("a" -> INT, "b" -> STRING))
    +
    +      util.verifySchema(
    +        util.addTable[JTuple3[Int, String, Double]]('a),
    +        Seq("a" -> INT))
    +    }
       }
     
       @Test
    -  def testGetFieldInfoRowNames(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(
    -      rowType,
    -      Array(
    -        UnresolvedFieldReference("name1"),
    -        UnresolvedFieldReference("name2"),
    -        UnresolvedFieldReference("name3")
    -      ))
    -
    -    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => 
assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testStreamProjectWithAddingTimeAttributesByPosition(): Unit = {
    +    val util = streamTestUtil()
    +
    +    // case class
    +    util.verifySchema(
    +      util.addTable[CClass]('a, 'b, 'c , 'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> 
PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
    +
    +    util.verifySchema(
    +      util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime, 
'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, 
"proctime" -> PROCTIME))
    +
    +    // row
    +    util.verifySchema(
    +      util.addTable('a, 'b, 'c, 'proctime.proctime)(TEST_ROW),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> 
PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable('a, 'b, 'c, 'rowtime.rowtime)(TEST_ROW),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
    +
    +    util.verifySchema(
    +      util.addTable('a, 'b, 'c, 'rowtime.rowtime, 
'proctime.proctime)(TEST_ROW),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, 
"proctime" -> PROCTIME))
    +
    +    // tuple
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 
'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> 
PROCTIME))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 
'rowtime.rowtime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
    +
    +    util.verifySchema(
    +      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 
'rowtime.rowtime, 'proctime.proctime),
    +      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, 
"proctime" -> PROCTIME))
       }
     
       @Test
    -  def testGetFieldInfoTuple(): Unit = {
    -    val fieldInfo = tEnv.getFieldInfo(tupleType)
    -
    -    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => 
assertEquals(x._2, x._1))
    -    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
    +  def testStreamAliasWithReplacingTimeAttributesByPosition(): Unit = {
    +    val util = streamTestUtil()
    +
    +    // case class
    +    util.verifySchema(
    +      util.addTable[CClassWithTime]('a, 'b.rowtime, 'c),
    +      Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
    +
    +    util.verifySchema(
    +      util.addTable[CClassWithTime]('a, ('b as 'new).rowtime, 'c),
    --- End diff --
    
    which field does `'b` reference? We could do the same with `'new.rowtime`, 
right?


> Make schema definition of DataStream/DataSet to Table conversion more flexible
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-8203
>                 URL: https://issues.apache.org/jira/browse/FLINK-8203
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Fabian Hueske
>            Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined (by default it is extracted from the 
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename 
> fields, or define time attributes. Right now, there are several limitations 
> how the fields can be defined that also depend on the type of the 
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., 
> tuples, case classes, Row) require schema definition based on the position of 
> fields. Pojo types which have no fixed order of fields, require to refer to 
> fields by name. Moreover, there are several restrictions on how time 
> attributes can be defined, e.g., event time attribute must replace an 
> existing field or be appended and proctime attributes must be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are 
> referenced by name (and possibly renamed using an alias ({{as}}). In this 
> mode, fields can be reordered and projected out. Moreover, we can define 
> proctime and eventtime attributes at arbitrary positions using arbitrary 
> names (except those that existing the result schema). This mode can be used 
> for any input type, including POJOs. This mode is used if all field 
> references exist in the input type.
> 2. Reference input fields by position: Field references might not refer to 
> existing fields in the input type. In this mode, fields are simply renamed. 
> Event-time attributes can replace the field on their position in the input 
> data (if it is of correct type) or be appended at the end. Proctime 
> attributes must be appended at the end. This mode can only be used if the 
> input type has a defined field order (tuple, case class, Row).
> We need to add more tests the check for all combinations of input types and 
> schema definition modes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to