[ 
https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15045061#comment-15045061
 ] 

ASF GitHub Bot commented on FLINK-2828:
---------------------------------------

Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1237#issuecomment-162553706
  
    Yes, we should, this has been lying around for a while now.
    
    I have been thinking about this for a while now and I'm afraid I cannot 
recommend to merge this in the current form. This PR is tackling two issues: 1) 
adding new interfaces for Table API input formats, 2) pushing filter predicates 
into sources. 1) will end up being a question of style, i.e. whether we want to 
have two input interfaces, or just one that can do both. 
    
    On 2), I think we need a different approach. The approach taken in this PR 
is to enhance the Rules that analyze/transform expression with knowledge about 
the input operation. This allows to backtrack fields to the sources and notify 
the source of used fields/predicates. I think, however, that this is not the 
correct level to implement it. The plan of operations is a Tree itself and can 
also be analyzed/transformed using the Tree framework of the Table API. I think 
it would be beneficial to provide analysis steps that can actually push filters 
towards the sources. (And then fuse adaptive source and filter if possible) 
This will be helpful even if no adaptive sources are present and it will 
transform one valid tree of the user query into another valid tree of the user 
query which makes it easy to see the optimizations that are taking place and to 
debug it.
    
    To show what I mean I implemented a small prototype that prunes unused 
fields of selection expressions and pushes filters towards sources: 
https://github.com/aljoscha/flink/tree/table-filter-push. There is an example 
in there: `FilterPushExample.scala`, the code for tree analysis is in 
`PlanAnalyzer.scala`. The example query is this:
    ```
    val input = env.fromElements((1, 1, 1), (2, 2, 2), (4, 4, 4), (5, 5, 5))
    
        input
          .as('a, 'b, 'c)
          .select('a as 'd, 'b, 'c + 15 as 'e)
          .filter('d > 3)
          .select('d)
          .toDataSet[Row].print
    ```
    
    if the example is run it will print the operation tree after tree analysis 
phases, this is the printout:
    ```
    Before PushFilter: 
Select(Filter(Select(Root(MapOperator@49c43f4e,ArraySeq((a,Integer), 
(b,Integer), (c,Integer))), 'a as 'd,'b,('c + 15) as 'e), 'd > 3), 'd)
    After PushFilter: 
Select(Select(Filter(Root(MapOperator@49c43f4e,ArraySeq((a,Integer), 
(b,Integer), (c,Integer))), 'a > 3), 'a as 'd,'b,('c + 15) as 'e), 'd)
    Before prune: 
Select(Select(Filter(Root(MapOperator@49c43f4e,ArraySeq((a,Integer), 
(b,Integer), (c,Integer))), 'a > 3), 'a as 'd,'b,('c + 15) as 'e), 'd)
    After prune: 
Select(Select(Filter(Root(MapOperator@49c43f4e,ArraySeq((a,Integer), 
(b,Integer), (c,Integer))), 'a > 3), 'a as 'd), 'd)
    ROOT used fields: a
    
    ```


> Add interfaces for Table API input formats
> ------------------------------------------
>
>                 Key: FLINK-2828
>                 URL: https://issues.apache.org/jira/browse/FLINK-2828
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>
> In order to support input formats for the Table API, interfaces are 
> necessary. I propose two types of TableSources:
> - AdaptiveTableSources can adapt their output to the requirements of the 
> plan. Although the output schema stays the same, the TableSource can react on 
> field resolution and/or predicates internally and can return adapted 
> DataSet/DataStream versions in the "translate" step.
> - StaticTableSources are an easy way to provide the Table API with additional 
> input formats without much implementation effort (e.g. for fromCsvFile())
> TableSources need to be deeply integrated into the Table API.
> The TableEnvironment requires a newly introduced AbstractExecutionEnvironment 
> (common super class of all ExecutionEnvironments for DataSets and 
> DataStreams).
> Here's what a TableSource can see from more complicated queries:
> {code}
> getTableJava(tableSource1)
>   .filter("a===5 || a===6")
>   .select("a as a4, b as b4, c as c4")
>   .filter("b4===7")
>   .join(getTableJava(tableSource2))
>   .where("a===a4 && c==='Test' && c4==='Test2'")
> // Result predicates for tableSource1:
> //  List("a===5 || a===6", "b===7", "c==='Test2'")
> // Result predicates for tableSource2:
> //  List("c==='Test'")
> // Result resolved fields for tableSource1 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), 
> ("c", true))
> // Result resolved fields for tableSource2 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("c", true))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to