[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878662#comment-15878662 ]
ASF GitHub Bot commented on FLINK-3849: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102507787 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala --- @@ -126,21 +156,49 @@ class TableSourceTest extends TableTestBase { @Test def testStreamProjectableSourceScanNoIdentityCalc(): Unit = { - val (csvTable, tableName) = tableSource + val (tableSource, tableName) = csvTable val util = streamTestUtil() val tEnv = util.tEnv - tEnv.registerTableSource(tableName, csvTable) + tEnv.registerTableSource(tableName, tableSource) val result = tEnv .scan(tableName) .select('id, 'score, 'first) - val expected = sourceStreamTableNode(tableName, noCalcFields) + val expected = projectableSourceStreamTableNode(tableName, noCalcFields) util.verifyTable(result, expected) } @Test + def testStreamFilterableSourceScanPlanTableApi(): Unit = { + val (tableSource, tableName) = filterableTableSource + val util = streamTestUtil() + val tEnv = util.tEnv + + tEnv.registerTableSource(tableName, tableSource) + + val result = tEnv + .scan(tableName) + .select('price, 'id, 'amount) + .where("amount > 2 && price * 2 < 32") --- End diff -- An example for an unsupported predicate would be `'id.cast(BasicTypeInfo.STRING_TYPE_INFO) === "abc"`. This throws and exception when translating it to an `Expression`. As said before, unsupported expressions should be gracefully handled by not failing but instead by not offering this `RexNode` to the `FilterableTableSource` and evaluating it in the `DataSetCalc`. I would suggest to use `CAST` as an example to implement the graceful handling and adding support for it once the failure-free translation works. > Add FilterableTableSource interface and translation rule > -------------------------------------------------------- > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)