[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15927353#comment-15927353
 ] 

ASF GitHub Bot commented on FLINK-3849:
---------------------------------------

Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3520#discussion_r106327707
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushFilterIntoTableSourceScanRuleBase.scala
 ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.common
    +
    +import org.apache.calcite.plan.RelOptRuleCall
    +import org.apache.calcite.rel.core.Calc
    +import org.apache.calcite.rex.RexProgram
    +import org.apache.flink.table.plan.nodes.TableSourceScan
    +import org.apache.flink.table.plan.schema.TableSourceTable
    +import org.apache.flink.table.plan.util.RexProgramExtractor
    +import org.apache.flink.table.sources.FilterableTableSource
    +
    +trait PushFilterIntoTableSourceScanRuleBase {
    +
    +  private[flink] def pushFilterIntoScan(
    +      call: RelOptRuleCall,
    +      calc: Calc,
    +      scan: TableSourceScan,
    +      tableSourceTable: TableSourceTable[_],
    +      filterableSource: FilterableTableSource[_],
    +      description: String): Unit = {
    +
    +    if (filterableSource.isFilterPushedDown) {
    +      // The rule can get triggered again due to the transformed "scan => 
filter"
    +      // sequence created by the earlier execution of this rule when we 
could not
    +      // push all the conditions into the scan
    +      return
    +    }
    +
    +    val program = calc.getProgram
    +    val (predicates, unconvertedRexNodes) =
    +      RexProgramExtractor.extractConjunctiveConditions(
    +        program,
    +        call.builder().getRexBuilder,
    +        tableSourceTable.tableEnv.getFunctionCatalog)
    +    if (predicates.isEmpty) {
    +      // no condition can be translated to expression
    +      return
    +    }
    +
    +    val (newTableSource, remainingPredicates) = 
filterableSource.applyPredicate(predicates)
    +    // trying to apply filter push down, set the flag to true no matter 
whether
    +    // we actually push any filters down.
    +    newTableSource.setFilterPushedDown(true)
    +
    +    // check whether framework still need to do a filter
    +    val relBuilder = call.builder()
    +    val remainingCondition = {
    +      if (remainingPredicates.nonEmpty || unconvertedRexNodes.nonEmpty) {
    +        relBuilder.push(scan)
    +        (remainingPredicates.map(expr => expr.toRexNode(relBuilder)) ++ 
unconvertedRexNodes)
    +            .reduce((l, r) => relBuilder.and(l, r))
    +      } else {
    +        null
    +      }
    +    }
    +
    +    // check whether we still need a RexProgram. An RexProgram is needed 
when either
    +    // projection or filter exists.
    +    val newScan = scan.copy(scan.getTraitSet, newTableSource)
    +    val newRexProgram = {
    +      if (remainingCondition != null || program.getProjectList.size() > 0) 
{
    --- End diff --
    
    Thanks for the tips, will change this.


> Add FilterableTableSource interface and translation rule
> --------------------------------------------------------
>
>                 Key: FLINK-3849
>                 URL: https://issues.apache.org/jira/browse/FLINK-3849
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Kurt Young
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to