Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3808#discussion_r114910197
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
 ---
    @@ -62,31 +62,30 @@ class GroupWindowTest extends TableTestBase {
       
//===============================================================================================
     
       @Test(expected = classOf[ValidationException])
    -  def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
    +  def testInvalidProcessingTimeDefinition(): Unit = {
         val util = batchTestUtil()
    -    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
    -
    -    table
    -      .window(Tumble over 50.milli as 'w)   // require a time attribute
    -      .groupBy('w, 'string)
    -      .select('string, 'int.count)
    +    // proctime is not allowed
    +    util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string)
       }
     
       @Test(expected = classOf[ValidationException])
    -  def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
    +  def testInvalidProcessingTimeDefinition2(): Unit = {
         val util = batchTestUtil()
    -    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
    +    // proctime is not allowed
    +    util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'proctime.proctime)
    +  }
     
    -    table
    -      .window(Tumble over 2.rows as 'w)   // require a time attribute
    -      .groupBy('w, 'string)
    -      .select('string, 'int.count)
    +  @Test(expected = classOf[ValidationException])
    +  def testInvalidEventTimeDefinition(): Unit = {
    +    val util = batchTestUtil()
    +    // definition must not extend schema
    +    util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'rowtime.rowtime)
       }
     
       @Test
       def testEventTimeTumblingGroupWindowOverCount(): Unit = {
         val util = batchTestUtil()
    -    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
    +    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
    --- End diff --
    
    +1, we should only replace attributes with valid time types (timestamp, 
long) for stream tables.
    I would not allow `.rowtime` for batch tables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to