Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4710#discussion_r141130569
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
---
@@ -121,19 +161,81 @@ class TableSourceTest extends TableTestBase {
)
util.verifyTable(t, expected)
}
+
+ @Test
+ def testProjectableProcTimeTableSource(): Unit = {
+ // ensures that projection is not pushed into table source with
proctime indicators
+ val util = streamTestUtil()
+
+ val projectableTableSource = new TestProctimeSource("pTime") with
ProjectableTableSource[Row] {
+ override def projectFields(fields: Array[Int]): TableSource[Row] = {
+ // ensure this method is not called!
+ Assert.fail()
+ null.asInstanceOf[TableSource[Row]]
+ }
+ }
+ util.tableEnv.registerTableSource("PTimeTable", projectableTableSource)
+
+ val t = util.tableEnv.scan("PTimeTable")
+ .select('name, 'val)
+ .where('val > 10)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ "StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val,
name, pTime])",
+ term("select", "name", "val"),
+ term("where", ">(val, 10)")
+ )
+ util.verifyTable(t, expected)
+ }
+
+ @Test
+ def testProjectableRowTimeTableSource(): Unit = {
--- End diff --
Yes, that should not be a problem. Projection push-down is not possible
because the schema of the table is partially constructed inside of the Table
API (e.g., appending the proctime attribute).
---