Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1237#issuecomment-162553706
  
    Yes, we should, this has been lying around for a while now.
    
    I have been thinking about this for a while now and I'm afraid I cannot 
recommend to merge this in the current form. This PR is tackling two issues: 1) 
adding new interfaces for Table API input formats, 2) pushing filter predicates 
into sources. 1) will end up being a question of style, i.e. whether we want to 
have two input interfaces, or just one that can do both. 
    
    On 2), I think we need a different approach. The approach taken in this PR 
is to enhance the Rules that analyze/transform expression with knowledge about 
the input operation. This allows to backtrack fields to the sources and notify 
the source of used fields/predicates. I think, however, that this is not the 
correct level to implement it. The plan of operations is a Tree itself and can 
also be analyzed/transformed using the Tree framework of the Table API. I think 
it would be beneficial to provide analysis steps that can actually push filters 
towards the sources. (And then fuse adaptive source and filter if possible) 
This will be helpful even if no adaptive sources are present and it will 
transform one valid tree of the user query into another valid tree of the user 
query which makes it easy to see the optimizations that are taking place and to 
debug it.
    
    To show what I mean I implemented a small prototype that prunes unused 
fields of selection expressions and pushes filters towards sources: 
https://github.com/aljoscha/flink/tree/table-filter-push. There is an example 
in there: `FilterPushExample.scala`, the code for tree analysis is in 
`PlanAnalyzer.scala`. The example query is this:
    ```
    val input = env.fromElements((1, 1, 1), (2, 2, 2), (4, 4, 4), (5, 5, 5))
    
        input
          .as('a, 'b, 'c)
          .select('a as 'd, 'b, 'c + 15 as 'e)
          .filter('d > 3)
          .select('d)
          .toDataSet[Row].print
    ```
    
    if the example is run it will print the operation tree after tree analysis 
phases, this is the printout:
    ```
    Before PushFilter: 
Select(Filter(Select(Root(MapOperator@49c43f4e,ArraySeq((a,Integer), 
(b,Integer), (c,Integer))), 'a as 'd,'b,('c + 15) as 'e), 'd > 3), 'd)
    After PushFilter: 
Select(Select(Filter(Root(MapOperator@49c43f4e,ArraySeq((a,Integer), 
(b,Integer), (c,Integer))), 'a > 3), 'a as 'd,'b,('c + 15) as 'e), 'd)
    Before prune: 
Select(Select(Filter(Root(MapOperator@49c43f4e,ArraySeq((a,Integer), 
(b,Integer), (c,Integer))), 'a > 3), 'a as 'd,'b,('c + 15) as 'e), 'd)
    After prune: 
Select(Select(Filter(Root(MapOperator@49c43f4e,ArraySeq((a,Integer), 
(b,Integer), (c,Integer))), 'a > 3), 'a as 'd), 'd)
    ROOT used fields: a
    
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to