Hi huaxin What I am concerned about is abstraction
1. How to extend sources.Aggregation. Because Catalyst Expression is recursion, it is very bad to define a new hierarchy, I think ScanBuilder must convert pushed expressions to its format. 2. The optimization rule is also an extended point, I didn't see any consideration on join push down. I also think SupportsPushDownRequiredColumns and SupportsPushDownFilters are problematic. Obviously, File Based Source and SQL Based Source are quite different on push down capabilities. I am not sure they can be consolidated into one API. I will push my PR tomorrow, and after that, could we schedule a meeting to discuss the API? huaxin gao <[email protected]> 于2021年4月5日周一 上午2:24写道: > Hello Chang, > > Thanks for proposing the SPIP and initiating the discussion. However, I > think the problem with your proposal is that you haven’t taken into > consideration file-based data sources such as parquet, ORC, etc. As far as > I know, most of the Spark users have file-based data sources. As a matter > of fact, I have customers waiting for Aggregate push down for Parquet. > That’s the reason I have my current implementation, which has a unified > Aggregate push down approach for both the file-based data sources and JDBC. > > I discussed with several members of the Spark community recently, and we > have agreed to break down the Aggregate push down work into the following > steps: > > 1. > > Implement Max, Min and Count push down in Parquet > 2. > > Add a new physical plan rewrite rule to remove partial aggregate. We > can optimize one more step to remove ShuffleExchange if the group by column > and partition col are the same. > 3. > > Implement Max, Min and Count push down in JDBC > 4. > > Implement Sum and Avg push down in JDBC > > > I plan to implement Aggregate push down for Parquet first for now. The > reasons are: > > 1. > > It’s relatively easier to implement Parquet Aggregate push down than > JDBC. > > > 1. > > Only need to implement Max, Min and Count > 2. > > No need to deal with the differences between Spark and other databases. > For example, aggregating decimal values have different behaviours > between database implementations. > > The main point is that we want to keep the PR minimal and support the > basic infrastructure for Aggregate push down first. Actually, the PR for > implementing Parquet Aggregate push down is already very big. We don’t want > to have a huge PR to solve all the problems. It’s too hard to review. > > > 1. > > I think it’s too early to implement the JDBC Aggregate push down for > now. Underneath, V2 DS JDBC still calls the V1 DS JDBC path. If we > implement JDBC Aggregate push down now, we still need to add a *trait > PrunedFilteredAggregateScan* for V1 JDBC. One of the major motivations > that we are having V2 DS is that we want to improve the flexibility of > implementing new operator push down by avoiding adding a new push down > trait. If we still add a new pushdown trait in V1 DS JDBC, I feel we are > defeating the purpose of having DS V2. So I want to wait until we fully > migrate to DS V2 JDBC, and then implement Aggregate push down for JDBC. > > > I have submitted Parquet Aggregate push down PR. Here is the link: > > https://github.com/apache/spark/pull/32049 > > > Thanks, > > Huaxin > > > On Fri, Apr 2, 2021 at 1:04 AM Chang Chen <[email protected]> wrote: > >> The link is broken. I post a PDF version. >> >> Chang Chen <[email protected]> 于2021年4月2日周五 下午3:57写道: >> >>> Hi All >>> >>> We would like to post s SPIP of Datasource V2 SQL PushDown in Spark. >>> Here is document link: >>> >>> >>> https://olapio.atlassian.net/wiki/spaces/TeamCX/pages/2667315361/Discuss+SQL+Data+Source+V2+SQL+Push+Down?atlOrigin=eyJpIjoiOTI5NGYzYWMzMWYwNDliOWIwM2ZkODllODk4Njk2NzEiLCJwIjoiYyJ9 >>> >>> This SPIP aims to make pushdown more extendable. >>> >>> I would like to thank huaxin gao, my prototype is based on her PR. I >>> will submit a PR ASAP >>> >>> Thanks >>> >>> Chang. >>> >>
