OK I agree with it, how about we add a new interface to push down the query plan, based on the current framework? We can mark the query-plan-push-down interface as unstable, to save the effort of designing a stable representation of query plan and maintaining forward compatibility.
On Wed, Aug 30, 2017 at 10:53 AM, James Baker <j.ba...@outlook.com> wrote: > I'll just focus on the one-by-one thing for now - it's the thing that > blocks me the most. > > I think the place where we're most confused here is on the cost of > determining whether I can push down a filter. For me, in order to work out > whether I can push down a filter or satisfy a sort, I might have to read > plenty of data. That said, it's worth me doing this because I can use this > information to avoid reading >>that much data. > > If you give me all the orderings, I will have to read that data many times > (we stream it to avoid keeping it in memory). > > There's also a thing where our typical use cases have many filters (20+ is > common). So, it's likely not going to work to pass us all the combinations. > That said, if I can tell you a cost, I know what optimal looks like, why > can't I just pick that myself? > > The current design is friendly to simple datasources, but does not have > the potential to support this. > > So the main problem we have with datasources v1 is that it's essentially > impossible to leverage a bunch of Spark features - I don't get to use > bucketing or row batches or all the nice things that I really want to use > to get decent performance. Provided I can leverage these in a moderately > supported way which won't break in any given commit, I'll be pretty happy > with anything that lets me opt out of the restrictions. > > My suggestion here is that if you make a mode which works well for > complicated use cases, you end up being able to write simple mode in terms > of it very easily. So we could actually provide two APIs, one that lets > people who have more interesting datasources leverage the cool Spark > features, and one that lets people who just want to implement basic > features do that - I'd try to include some kind of layering here. I could > probably sketch out something here if that'd be useful? > > James > > On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <cloud0...@gmail.com> wrote: > >> Hi James, >> >> Thanks for your feedback! I think your concerns are all valid, but we >> need to make a tradeoff here. >> >> > Explicitly here, what I'm looking for is a convenient mechanism to >> accept a fully specified set of arguments >> >> The problem with this approach is: 1) if we wanna add more arguments in >> the future, it's really hard to do without changing the existing interface. >> 2) if a user wants to implement a very simple data source, he has to look >> at all the arguments and understand them, which may be a burden for him. >> I don't have a solution to solve these 2 problems, comments are welcome. >> >> >> > There are loads of cases like this - you can imagine someone being >> able to push down a sort before a filter is applied, but not afterwards. >> However, maybe the filter is so selective that it's better to push down the >> filter and not handle the sort. I don't get to make this decision, Spark >> does (but doesn't have good enough information to do it properly, whilst I >> do). I want to be able to choose the parts I push down given knowledge of >> my datasource - as defined the APIs don't let me do that, they're strictly >> more restrictive than the V1 APIs in this way. >> >> This is true, the current framework applies push downs one by one, >> incrementally. If a data source wanna go back to accept a sort push down >> after it accepts a filter push down, it's impossible with the current data >> source V2. >> Fortunately, we have a solution for this problem. At Spark side, actually >> we do have a fully specified set of arguments waiting to be pushed down, >> but Spark doesn't know which is the best order to push them into data >> source. Spark can try every combination and ask the data source to report a >> cost, then Spark can pick the best combination with the lowest cost. This >> can also be implemented as a cost report interface, so that advanced data >> source can implement it for optimal performance, and simple data source >> doesn't need to care about it and keep simple. >> >> >> The current design is very friendly to simple data source, and has the >> potential to support complex data source, I prefer the current design over >> the plan push down one. What do you think? >> >> >> On Wed, Aug 30, 2017 at 5:53 AM, James Baker <j.ba...@outlook.com> wrote: >> >>> Yeah, for sure. >>> >>> With the stable representation - agree that in the general case this is >>> pretty intractable, it restricts the modifications that you can do in the >>> future too much. That said, it shouldn't be as hard if you restrict >>> yourself to the parts of the plan which are supported by the datasources V2 >>> API (which after all, need to be translateable properly into the future to >>> support the mixins proposed). This should have a pretty small scope in >>> comparison. As long as the user can bail out of nodes they don't >>> understand, they should be ok, right? >>> >>> That said, what would also be fine for us is a place to plug into an >>> unstable query plan. >>> >>> Explicitly here, what I'm looking for is a convenient mechanism to >>> accept a fully specified set of arguments (of which I can choose to ignore >>> some), and return the information as to which of them I'm ignoring. Taking >>> a query plan of sorts is a way of doing this which IMO is intuitive to the >>> user. It also provides a convenient location to plug in things like stats. >>> Not at all married to the idea of using a query plan here; it just seemed >>> convenient. >>> >>> Regarding the users who just want to be able to pump data into Spark, my >>> understanding is that replacing isolated nodes in a query plan is easy. >>> That said, our goal here is to be able to push down as much as possible >>> into the underlying datastore. >>> >>> To your second question: >>> >>> The issue is that if you build up pushdowns incrementally and not all at >>> once, you end up having to reject pushdowns and filters that you actually >>> can do, which unnecessarily increases overheads. >>> >>> For example, the dataset >>> >>> a b c >>> 1 2 3 >>> 1 3 3 >>> 1 3 4 >>> 2 1 1 >>> 2 0 1 >>> >>> can efficiently push down sort(b, c) if I have already applied the >>> filter a = 1, but otherwise will force a sort in Spark. On the PR I detail >>> a case I see where I can push down two equality filters iff I am given them >>> at the same time, whilst not being able to one at a time. >>> >>> There are loads of cases like this - you can imagine someone being able >>> to push down a sort before a filter is applied, but not afterwards. >>> However, maybe the filter is so selective that it's better to push down the >>> filter and not handle the sort. I don't get to make this decision, Spark >>> does (but doesn't have good enough information to do it properly, whilst I >>> do). I want to be able to choose the parts I push down given knowledge of >>> my datasource - as defined the APIs don't let me do that, they're strictly >>> more restrictive than the V1 APIs in this way. >>> >>> The pattern of not considering things that can be done in bulk bites us >>> in other ways. The retrieval methods end up being trickier to implement >>> than is necessary because frequently a single operation provides the result >>> of many of the getters, but the state is mutable, so you end up with odd >>> caches. >>> >>> For example, the work I need to do to answer unhandledFilters in V1 is >>> roughly the same as the work I need to do to buildScan, so I want to cache >>> it. This means that I end up with code that looks like: >>> >>> public final class CachingFoo implements Foo { >>> private final Foo delegate; >>> >>> private List<Filter> currentFilters = emptyList(); >>> private Supplier<Bar> barSupplier = newSupplier(currentFilters); >>> >>> public CachingFoo(Foo delegate) { >>> this.delegate = delegate; >>> } >>> >>> private Supplier<Bar> newSupplier(List<Filter> filters) { >>> return Suppliers.memoize(() -> delegate.computeBar(filters)); >>> } >>> >>> @Override >>> public Bar computeBar(List<Filter> filters) { >>> if (!filters.equals(currentFilters)) { >>> currentFilters = filters; >>> barSupplier = newSupplier(filters); >>> } >>> >>> return barSupplier.get(); >>> } >>> } >>> >>> which caches the result required in unhandledFilters on the expectation >>> that Spark will call buildScan afterwards and get to use the result.. >>> >>> This kind of cache becomes more prominent, but harder to deal with in >>> the new APIs. As one example here, the state I will need in order to >>> compute accurate column stats internally will likely be a subset of the >>> work required in order to get the read tasks, tell you if I can handle >>> filters, etc, so I'll want to cache them for reuse. However, the cached >>> information needs to be appropriately invalidated when I add a new filter >>> or sort order or limit, and this makes implementing the APIs harder and >>> more error-prone. >>> >>> One thing that'd be great is a defined contract of the order in which >>> Spark calls the methods on your datasource (ideally this contract could be >>> implied by the way the Java class structure works, but otherwise I can just >>> throw). >>> >>> James >>> >>> On Tue, 29 Aug 2017 at 02:56 Reynold Xin <r...@databricks.com> wrote: >>> >>>> James, >>>> >>>> Thanks for the comment. I think you just pointed out a trade-off >>>> between expressiveness and API simplicity, compatibility and evolvability. >>>> For the max expressiveness, we'd want the ability to expose full query >>>> plans, and let the data source decide which part of the query plan can be >>>> pushed down. >>>> >>>> The downside to that (full query plan push down) are: >>>> >>>> 1. It is extremely difficult to design a stable representation for >>>> logical / physical plan. It is doable, but we'd be the first to do it. I'm >>>> not sure of any mainstream databases being able to do that in the past. The >>>> design of that API itself, to make sure we have a good story for backward >>>> and forward compatibility, would probably take months if not years. It >>>> might still be good to do, or offer an experimental trait without >>>> compatibility guarantee that uses the current Catalyst internal logical >>>> plan. >>>> >>>> 2. Most data source developers simply want a way to offer some data, >>>> without any pushdown. Having to understand query plans is a burden rather >>>> than a gift. >>>> >>>> >>>> Re: your point about the proposed v2 being worse than v1 for your use >>>> case. >>>> >>>> Can you say more? You used the argument that in v2 there are more >>>> support for broader pushdown and as a result it is harder to implement. >>>> That's how it is supposed to be. If a data source simply implements one of >>>> the trait, it'd be logically identical to v1. I don't see why it would be >>>> worse or better, other than v2 provides much stronger forward compatibility >>>> guarantees than v1. >>>> >>>> >>>> On Tue, Aug 29, 2017 at 4:54 AM, James Baker <j.ba...@outlook.com> >>>> wrote: >>>> >>>>> Copying from the code review comments I just submitted on the draft >>>>> API (https://github.com/cloud-fan/spark/pull/10# >>>>> pullrequestreview-59088745): >>>>> >>>>> Context here is that I've spent some time implementing a Spark >>>>> datasource and have had some issues with the current API which are made >>>>> worse in V2. >>>>> >>>>> The general conclusion I’ve come to here is that this is very hard to >>>>> actually implement (in a similar but more aggressive way than DataSource >>>>> V1, because of the extra methods and dimensions we get in V2). >>>>> >>>>> In DataSources V1 PrunedFilteredScan, the issue is that you are passed >>>>> in the filters with the buildScan method, and then passed in again with >>>>> the >>>>> unhandledFilters method. >>>>> >>>>> However, the filters that you can’t handle might be data dependent, >>>>> which the current API does not handle well. Suppose I can handle filter A >>>>> some of the time, and filter B some of the time. If I’m passed in both, >>>>> then either A and B are unhandled, or A, or B, or neither. The work I have >>>>> to do to work this out is essentially the same as I have to do while >>>>> actually generating my RDD (essentially I have to generate my partitions), >>>>> so I end up doing some weird caching work. >>>>> >>>>> This V2 API proposal has the same issues, but perhaps moreso. In >>>>> PrunedFilteredScan, there is essentially one degree of freedom for pruning >>>>> (filters), so you just have to implement caching between unhandledFilters >>>>> and buildScan. However, here we have many degrees of freedom; sorts, >>>>> individual filters, clustering, sampling, maybe aggregations eventually - >>>>> and these operations are not all commutative, and computing my support >>>>> one-by-one can easily end up being more expensive than computing all in >>>>> one >>>>> go. >>>>> >>>>> For some trivial examples: >>>>> >>>>> - After filtering, I might be sorted, whilst before filtering I might >>>>> not be. >>>>> >>>>> - Filtering with certain filters might affect my ability to push down >>>>> others. >>>>> >>>>> - Filtering with aggregations (as mooted) might not be possible to >>>>> push down. >>>>> >>>>> And with the API as currently mooted, I need to be able to go back and >>>>> change my results because they might change later. >>>>> >>>>> Really what would be good here is to pass all of the filters and sorts >>>>> etc all at once, and then I return the parts I can’t handle. >>>>> >>>>> I’d prefer in general that this be implemented by passing some kind of >>>>> query plan to the datasource which enables this kind of replacement. >>>>> Explicitly don’t want to give the whole query plan - that sounds painful - >>>>> would prefer we push down only the parts of the query plan we deem to be >>>>> stable. With the mix-in approach, I don’t think we can guarantee the >>>>> properties we want without a two-phase thing - I’d really love to be able >>>>> to just define a straightforward union type which is our supported >>>>> pushdown >>>>> stuff, and then the user can transform and return it. >>>>> >>>>> I think this ends up being a more elegant API for consumers, and also >>>>> far more intuitive. >>>>> >>>>> James >>>>> >>>>> On Mon, 28 Aug 2017 at 18:00 蒋星博 <jiangxb1...@gmail.com> wrote: >>>>> >>>>>> +1 (Non-binding) >>>>>> >>>>>> Xiao Li <gatorsm...@gmail.com>于2017年8月28日 周一下午5:38写道: >>>>>> >>>>>>> +1 >>>>>>> >>>>>>> 2017-08-28 12:45 GMT-07:00 Cody Koeninger <c...@koeninger.org>: >>>>>>> >>>>>>>> Just wanted to point out that because the jira isn't labeled SPIP, >>>>>>>> it >>>>>>>> won't have shown up linked from >>>>>>>> >>>>>>>> http://spark.apache.org/improvement-proposals.html >>>>>>>> >>>>>>>> On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <cloud0...@gmail.com> >>>>>>>> wrote: >>>>>>>> > Hi all, >>>>>>>> > >>>>>>>> > It has been almost 2 weeks since I proposed the data source V2 for >>>>>>>> > discussion, and we already got some feedbacks on the JIRA ticket >>>>>>>> and the >>>>>>>> > prototype PR, so I'd like to call for a vote. >>>>>>>> > >>>>>>>> > The full document of the Data Source API V2 is: >>>>>>>> > https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ- >>>>>>>> Z8qU5Frf6WMQZ6jJVM/edit >>>>>>>> > >>>>>>>> > Note that, this vote should focus on high-level design/framework, >>>>>>>> not >>>>>>>> > specified APIs, as we can always change/improve specified APIs >>>>>>>> during >>>>>>>> > development. >>>>>>>> > >>>>>>>> > The vote will be up for the next 72 hours. Please reply with your >>>>>>>> vote: >>>>>>>> > >>>>>>>> > +1: Yeah, let's go forward and implement the SPIP. >>>>>>>> > +0: Don't really care. >>>>>>>> > -1: I don't think this is a good idea because of the following >>>>>>>> technical >>>>>>>> > reasons. >>>>>>>> > >>>>>>>> > Thanks! >>>>>>>> >>>>>>>> ------------------------------------------------------------ >>>>>>>> --------- >>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>>>>> >>>>>>>> >>>>>>> >>>> >>