[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15929614#comment-15929614
 ] 

ASF GitHub Bot commented on FLINK-3849:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3520#discussion_r106609749
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
    @@ -36,36 +43,34 @@ trait PushFilterIntoTableSourceScanRuleBase {
           filterableSource: FilterableTableSource[_],
           description: String): Unit = {
     
    -    if (filterableSource.isFilterPushedDown) {
    -      // The rule can get triggered again due to the transformed "scan => 
filter"
    -      // sequence created by the earlier execution of this rule when we 
could not
    -      // push all the conditions into the scan
    -      return
    -    }
    +    Preconditions.checkArgument(!filterableSource.isFilterPushedDown)
     
         val program = calc.getProgram
    +    val functionCatalog = FunctionCatalog.withBuiltIns
         val (predicates, unconvertedRexNodes) =
           RexProgramExtractor.extractConjunctiveConditions(
             program,
             call.builder().getRexBuilder,
    -        tableSourceTable.tableEnv.getFunctionCatalog)
    +        functionCatalog)
         if (predicates.isEmpty) {
           // no condition can be translated to expression
           return
         }
     
    -    val (newTableSource, remainingPredicates) = 
filterableSource.applyPredicate(predicates)
    -    // trying to apply filter push down, set the flag to true no matter 
whether
    -    // we actually push any filters down.
    -    newTableSource.setFilterPushedDown(true)
    +    val remainingPredicates = new util.LinkedList[Expression]()
    +    predicates.foreach(e => remainingPredicates.add(e))
    +
    +    val newTableSource = 
filterableSource.applyPredicate(remainingPredicates)
    --- End diff --
    
    OK, you're right. Thanks for the example.


> Add FilterableTableSource interface and translation rule
> --------------------------------------------------------
>
>                 Key: FLINK-3849
>                 URL: https://issues.apache.org/jira/browse/FLINK-3849
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to