[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878662#comment-15878662
 ] 

ASF GitHub Bot commented on FLINK-3849:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r102507787
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
 ---
    @@ -126,21 +156,49 @@ class TableSourceTest extends TableTestBase {
     
       @Test
       def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
    -    val (csvTable, tableName) = tableSource
    +    val (tableSource, tableName) = csvTable
         val util = streamTestUtil()
         val tEnv = util.tEnv
     
    -    tEnv.registerTableSource(tableName, csvTable)
    +    tEnv.registerTableSource(tableName, tableSource)
     
         val result = tEnv
           .scan(tableName)
           .select('id, 'score, 'first)
     
    -    val expected = sourceStreamTableNode(tableName, noCalcFields)
    +    val expected = projectableSourceStreamTableNode(tableName, 
noCalcFields)
         util.verifyTable(result, expected)
       }
     
       @Test
    +  def testStreamFilterableSourceScanPlanTableApi(): Unit = {
    +    val (tableSource, tableName) = filterableTableSource
    +    val util = streamTestUtil()
    +    val tEnv = util.tEnv
    +
    +    tEnv.registerTableSource(tableName, tableSource)
    +
    +    val result = tEnv
    +      .scan(tableName)
    +      .select('price, 'id, 'amount)
    +      .where("amount > 2 && price * 2 < 32")
    --- End diff --
    
    An example for an unsupported predicate would be 
`'id.cast(BasicTypeInfo.STRING_TYPE_INFO) === "abc"`. This throws and exception 
when translating it to an `Expression`.
    
    As said before, unsupported expressions should be gracefully handled by not 
failing but instead by not offering this `RexNode` to the 
`FilterableTableSource` and evaluating it in the `DataSetCalc`.
    I would suggest to use `CAST` as an example to implement the graceful 
handling and adding support for it once the failure-free translation works. 


> Add FilterableTableSource interface and translation rule
> --------------------------------------------------------
>
>                 Key: FLINK-3849
>                 URL: https://issues.apache.org/jira/browse/FLINK-3849
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Anton Solovev
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to