[
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878662#comment-15878662
]
ASF GitHub Bot commented on FLINK-3849:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3166#discussion_r102507787
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
---
@@ -126,21 +156,49 @@ class TableSourceTest extends TableTestBase {
@Test
def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
- val (csvTable, tableName) = tableSource
+ val (tableSource, tableName) = csvTable
val util = streamTestUtil()
val tEnv = util.tEnv
- tEnv.registerTableSource(tableName, csvTable)
+ tEnv.registerTableSource(tableName, tableSource)
val result = tEnv
.scan(tableName)
.select('id, 'score, 'first)
- val expected = sourceStreamTableNode(tableName, noCalcFields)
+ val expected = projectableSourceStreamTableNode(tableName,
noCalcFields)
util.verifyTable(result, expected)
}
@Test
+ def testStreamFilterableSourceScanPlanTableApi(): Unit = {
+ val (tableSource, tableName) = filterableTableSource
+ val util = streamTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerTableSource(tableName, tableSource)
+
+ val result = tEnv
+ .scan(tableName)
+ .select('price, 'id, 'amount)
+ .where("amount > 2 && price * 2 < 32")
--- End diff --
An example for an unsupported predicate would be
`'id.cast(BasicTypeInfo.STRING_TYPE_INFO) === "abc"`. This throws and exception
when translating it to an `Expression`.
As said before, unsupported expressions should be gracefully handled by not
failing but instead by not offering this `RexNode` to the
`FilterableTableSource` and evaluating it in the `DataSetCalc`.
I would suggest to use `CAST` as an example to implement the graceful
handling and adding support for it once the failure-free translation works.
> Add FilterableTableSource interface and translation rule
> --------------------------------------------------------
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Assignee: Anton Solovev
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
> // returns unsupported predicate expression
> def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak
> the cost model as well to push the optimizer in the right direction.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)