[
https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15045061#comment-15045061
]
ASF GitHub Bot commented on FLINK-2828:
---------------------------------------
Github user aljoscha commented on the pull request:
https://github.com/apache/flink/pull/1237#issuecomment-162553706
Yes, we should, this has been lying around for a while now.
I have been thinking about this for a while now and I'm afraid I cannot
recommend to merge this in the current form. This PR is tackling two issues: 1)
adding new interfaces for Table API input formats, 2) pushing filter predicates
into sources. 1) will end up being a question of style, i.e. whether we want to
have two input interfaces, or just one that can do both.
On 2), I think we need a different approach. The approach taken in this PR
is to enhance the Rules that analyze/transform expression with knowledge about
the input operation. This allows to backtrack fields to the sources and notify
the source of used fields/predicates. I think, however, that this is not the
correct level to implement it. The plan of operations is a Tree itself and can
also be analyzed/transformed using the Tree framework of the Table API. I think
it would be beneficial to provide analysis steps that can actually push filters
towards the sources. (And then fuse adaptive source and filter if possible)
This will be helpful even if no adaptive sources are present and it will
transform one valid tree of the user query into another valid tree of the user
query which makes it easy to see the optimizations that are taking place and to
debug it.
To show what I mean I implemented a small prototype that prunes unused
fields of selection expressions and pushes filters towards sources:
https://github.com/aljoscha/flink/tree/table-filter-push. There is an example
in there: `FilterPushExample.scala`, the code for tree analysis is in
`PlanAnalyzer.scala`. The example query is this:
```
val input = env.fromElements((1, 1, 1), (2, 2, 2), (4, 4, 4), (5, 5, 5))
input
.as('a, 'b, 'c)
.select('a as 'd, 'b, 'c + 15 as 'e)
.filter('d > 3)
.select('d)
.toDataSet[Row].print
```
if the example is run it will print the operation tree after tree analysis
phases, this is the printout:
```
Before PushFilter:
Select(Filter(Select(Root(MapOperator@49c43f4e,ArraySeq((a,Integer),
(b,Integer), (c,Integer))), 'a as 'd,'b,('c + 15) as 'e), 'd > 3), 'd)
After PushFilter:
Select(Select(Filter(Root(MapOperator@49c43f4e,ArraySeq((a,Integer),
(b,Integer), (c,Integer))), 'a > 3), 'a as 'd,'b,('c + 15) as 'e), 'd)
Before prune:
Select(Select(Filter(Root(MapOperator@49c43f4e,ArraySeq((a,Integer),
(b,Integer), (c,Integer))), 'a > 3), 'a as 'd,'b,('c + 15) as 'e), 'd)
After prune:
Select(Select(Filter(Root(MapOperator@49c43f4e,ArraySeq((a,Integer),
(b,Integer), (c,Integer))), 'a > 3), 'a as 'd), 'd)
ROOT used fields: a
```
> Add interfaces for Table API input formats
> ------------------------------------------
>
> Key: FLINK-2828
> URL: https://issues.apache.org/jira/browse/FLINK-2828
> Project: Flink
> Issue Type: New Feature
> Components: Table API
> Reporter: Timo Walther
> Assignee: Timo Walther
>
> In order to support input formats for the Table API, interfaces are
> necessary. I propose two types of TableSources:
> - AdaptiveTableSources can adapt their output to the requirements of the
> plan. Although the output schema stays the same, the TableSource can react on
> field resolution and/or predicates internally and can return adapted
> DataSet/DataStream versions in the "translate" step.
> - StaticTableSources are an easy way to provide the Table API with additional
> input formats without much implementation effort (e.g. for fromCsvFile())
> TableSources need to be deeply integrated into the Table API.
> The TableEnvironment requires a newly introduced AbstractExecutionEnvironment
> (common super class of all ExecutionEnvironments for DataSets and
> DataStreams).
> Here's what a TableSource can see from more complicated queries:
> {code}
> getTableJava(tableSource1)
> .filter("a===5 || a===6")
> .select("a as a4, b as b4, c as c4")
> .filter("b4===7")
> .join(getTableJava(tableSource2))
> .where("a===a4 && c==='Test' && c4==='Test2'")
> // Result predicates for tableSource1:
> // List("a===5 || a===6", "b===7", "c==='Test2'")
> // Result predicates for tableSource2:
> // List("c==='Test'")
> // Result resolved fields for tableSource1 (true = filtering,
> false=selection):
> // Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false),
> ("c", true))
> // Result resolved fields for tableSource2 (true = filtering,
> false=selection):
> // Set(("a", true), ("c", true))
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)