aljoscha commented on a change in pull request #8739: 
[FLINK-12853][table-api][table-planner] TableSourceUtils#validateTableSource 
method to common module
URL: https://github.com/apache/flink/pull/8739#discussion_r294347662
 
 

 ##########
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
 ##########
 @@ -128,6 +129,58 @@ class TableSourceValidationTest {
     tEnv.registerTableSource("testTable", ts)
   }
 
+  @Test
+  def testDefinedRowtimeDoesNotExist(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException
+      .expectMessage(
+        "Found a rowtime attribute for field 'rowtime' but it does not exist 
in the Table")
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val schema = new TableSchema(
+      Array("id", "name", "amount"),
+      Array(Types.LONG, Types.STRING, Types.INT))
+
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.SQL_TIMESTAMP(), Types.INT)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "rowtime", "amount"))
+
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime 
= "rowtime")
+
+    // should fail because rowtime field has invalid type
+    tEnv.registerTableSource("testTable", ts)
+  }
+
+  @Test
+  def testDefinedProctimeDoesNotExist(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException
+      .expectMessage(
+        "Found a proctime attribute for field 'proctime' but it does not exist 
in the Table")
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val schema = new TableSchema(
+      Array("id", "name", "amount"),
+      Array(Types.LONG, Types.STRING, Types.INT))
+
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.SQL_TIMESTAMP(), Types.INT)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "proctime", "amount"))
+
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), proctime 
= "proctime")
+
+    // should fail because rowtime field has invalid type
 
 Review comment:
   nit: this should be proctime, I guess 😅 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to