Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5065#discussion_r154961218
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
---
@@ -98,7 +98,9 @@ abstract class BatchTableEnvironment(
tableSource match {
case batchTableSource: BatchTableSource[_] =>
- registerTableInternal(name, new
BatchTableSourceTable(batchTableSource))
+ val table = new BatchTableSourceTable(batchTableSource)
+ checkValidTableSourceType(tableSource)
+ registerTableInternal(name, table)
--- End diff --
Couldn't we just add the check in `registerTableInternal`?
---