Maybe I'm missing something, but the high-level proposal consists of: Goals, Non-Goals, and Proposed API. What is there to discuss other than the details of the API that's being proposed? I think the goals make sense, but goals alone aren't enough to approve a SPIP.
On Wed, Aug 30, 2017 at 2:46 PM, Reynold Xin <r...@databricks.com> wrote: > So we seem to be getting into a cycle of discussing more about the details > of APIs than the high level proposal. The details of APIs are important to > debate, but those belong more in code reviews. > > One other important thing is that we should avoid API design by committee. > While it is extremely useful to get feedback, understand the use cases, we > cannot do API design by incorporating verbatim the union of everybody's > feedback. API design is largely a tradeoff game. The most expressive API > would also be harder to use, or sacrifice backward/forward compatibility. > It is as important to decide what to exclude as what to include. > > Unlike the v1 API, the way Wenchen's high level V2 framework is proposed > makes it very easy to add new features (e.g. clustering properties) in the > future without breaking any APIs. I'd rather us shipping something useful > that might not be the most comprehensive set, than debating about every > single feature we should add and then creating something super complicated > that has unclear value. > > > > On Wed, Aug 30, 2017 at 6:37 PM, Ryan Blue <rb...@netflix.com> wrote: > >> -1 (non-binding) >> >> Sometimes it takes a VOTE thread to get people to actually read and >> comment, so thanks for starting this one… but there’s still discussion >> happening on the prototype API, which it hasn’t been updated. I’d like to >> see the proposal shaped by the ongoing discussion so that we have a better, >> more concrete plan. I think that’s going to produces a better SPIP. >> >> The second reason for -1 is that I think the read- and write-side >> proposals should be separated. The PR >> <https://github.com/cloud-fan/spark/pull/10> currently has “write path” >> listed as a TODO item and most of the discussion I’ve seen is on the read >> side. I think it would be better to separate the read and write APIs so we >> can focus on them individually. >> >> An example of why we should focus on the write path separately is that >> the proposal says this: >> >> Ideally partitioning/bucketing concept should not be exposed in the Data >> Source API V2, because they are just techniques for data skipping and >> pre-partitioning. However, these 2 concepts are already widely used in >> Spark, e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. >> To be consistent, we need to add partitioning/bucketing to Data Source V2 . >> . . >> >> Essentially, the some APIs mix DDL and DML operations. I’d like to >> consider ways to fix that problem instead of carrying the problem forward >> to Data Source V2. We can solve this by adding a high-level API for DDL and >> a better write/insert API that works well with it. Clearly, that discussion >> is independent of the read path, which is why I think separating the two >> proposals would be a win. >> >> rb >> >> >> On Wed, Aug 30, 2017 at 4:28 AM, Reynold Xin <r...@databricks.com> wrote: >> >>> That might be good to do, but seems like orthogonal to this effort >>> itself. It would be a completely different interface. >>> >>> On Wed, Aug 30, 2017 at 1:10 PM Wenchen Fan <cloud0...@gmail.com> wrote: >>> >>>> OK I agree with it, how about we add a new interface to push down the >>>> query plan, based on the current framework? We can mark the >>>> query-plan-push-down interface as unstable, to save the effort of designing >>>> a stable representation of query plan and maintaining forward >>>> compatibility. >>>> >>>> On Wed, Aug 30, 2017 at 10:53 AM, James Baker <j.ba...@outlook.com> >>>> wrote: >>>> >>>>> I'll just focus on the one-by-one thing for now - it's the thing that >>>>> blocks me the most. >>>>> >>>>> I think the place where we're most confused here is on the cost of >>>>> determining whether I can push down a filter. For me, in order to work out >>>>> whether I can push down a filter or satisfy a sort, I might have to read >>>>> plenty of data. That said, it's worth me doing this because I can use this >>>>> information to avoid reading >>that much data. >>>>> >>>>> If you give me all the orderings, I will have to read that data many >>>>> times (we stream it to avoid keeping it in memory). >>>>> >>>>> There's also a thing where our typical use cases have many filters >>>>> (20+ is common). So, it's likely not going to work to pass us all the >>>>> combinations. That said, if I can tell you a cost, I know what optimal >>>>> looks like, why can't I just pick that myself? >>>>> >>>>> The current design is friendly to simple datasources, but does not >>>>> have the potential to support this. >>>>> >>>>> So the main problem we have with datasources v1 is that it's >>>>> essentially impossible to leverage a bunch of Spark features - I don't get >>>>> to use bucketing or row batches or all the nice things that I really want >>>>> to use to get decent performance. Provided I can leverage these in a >>>>> moderately supported way which won't break in any given commit, I'll be >>>>> pretty happy with anything that lets me opt out of the restrictions. >>>>> >>>>> My suggestion here is that if you make a mode which works well for >>>>> complicated use cases, you end up being able to write simple mode in terms >>>>> of it very easily. So we could actually provide two APIs, one that lets >>>>> people who have more interesting datasources leverage the cool Spark >>>>> features, and one that lets people who just want to implement basic >>>>> features do that - I'd try to include some kind of layering here. I could >>>>> probably sketch out something here if that'd be useful? >>>>> >>>>> James >>>>> >>>>> On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <cloud0...@gmail.com> wrote: >>>>> >>>>>> Hi James, >>>>>> >>>>>> Thanks for your feedback! I think your concerns are all valid, but we >>>>>> need to make a tradeoff here. >>>>>> >>>>>> > Explicitly here, what I'm looking for is a convenient mechanism to >>>>>> accept a fully specified set of arguments >>>>>> >>>>>> The problem with this approach is: 1) if we wanna add more arguments in >>>>>> the future, it's really hard to do without changing the existing >>>>>> interface. >>>>>> 2) if a user wants to implement a very simple data source, he has to look >>>>>> at all the arguments and understand them, which may be a burden for him. >>>>>> I don't have a solution to solve these 2 problems, comments are >>>>>> welcome. >>>>>> >>>>>> >>>>>> > There are loads of cases like this - you can imagine someone being >>>>>> able to push down a sort before a filter is applied, but not afterwards. >>>>>> However, maybe the filter is so selective that it's better to push down >>>>>> the >>>>>> filter and not handle the sort. I don't get to make this decision, Spark >>>>>> does (but doesn't have good enough information to do it properly, whilst >>>>>> I >>>>>> do). I want to be able to choose the parts I push down given knowledge of >>>>>> my datasource - as defined the APIs don't let me do that, they're >>>>>> strictly >>>>>> more restrictive than the V1 APIs in this way. >>>>>> >>>>>> This is true, the current framework applies push downs one by one, >>>>>> incrementally. If a data source wanna go back to accept a sort push down >>>>>> after it accepts a filter push down, it's impossible with the current >>>>>> data >>>>>> source V2. >>>>>> Fortunately, we have a solution for this problem. At Spark side, >>>>>> actually we do have a fully specified set of arguments waiting to be >>>>>> pushed down, but Spark doesn't know which is the best order to push them >>>>>> into data source. Spark can try every combination and ask the data source >>>>>> to report a cost, then Spark can pick the best combination with the >>>>>> lowest >>>>>> cost. This can also be implemented as a cost report interface, so that >>>>>> advanced data source can implement it for optimal performance, and simple >>>>>> data source doesn't need to care about it and keep simple. >>>>>> >>>>>> >>>>>> The current design is very friendly to simple data source, and has >>>>>> the potential to support complex data source, I prefer the current design >>>>>> over the plan push down one. What do you think? >>>>>> >>>>>> >>>>>> On Wed, Aug 30, 2017 at 5:53 AM, James Baker <j.ba...@outlook.com> >>>>>> wrote: >>>>>> >>>>>>> Yeah, for sure. >>>>>>> >>>>>>> With the stable representation - agree that in the general case this >>>>>>> is pretty intractable, it restricts the modifications that you can do in >>>>>>> the future too much. That said, it shouldn't be as hard if you restrict >>>>>>> yourself to the parts of the plan which are supported by the >>>>>>> datasources V2 >>>>>>> API (which after all, need to be translateable properly into the future >>>>>>> to >>>>>>> support the mixins proposed). This should have a pretty small scope in >>>>>>> comparison. As long as the user can bail out of nodes they don't >>>>>>> understand, they should be ok, right? >>>>>>> >>>>>>> That said, what would also be fine for us is a place to plug into an >>>>>>> unstable query plan. >>>>>>> >>>>>>> Explicitly here, what I'm looking for is a convenient mechanism to >>>>>>> accept a fully specified set of arguments (of which I can choose to >>>>>>> ignore >>>>>>> some), and return the information as to which of them I'm ignoring. >>>>>>> Taking >>>>>>> a query plan of sorts is a way of doing this which IMO is intuitive to >>>>>>> the >>>>>>> user. It also provides a convenient location to plug in things like >>>>>>> stats. >>>>>>> Not at all married to the idea of using a query plan here; it just >>>>>>> seemed >>>>>>> convenient. >>>>>>> >>>>>>> Regarding the users who just want to be able to pump data into >>>>>>> Spark, my understanding is that replacing isolated nodes in a query >>>>>>> plan is >>>>>>> easy. That said, our goal here is to be able to push down as much as >>>>>>> possible into the underlying datastore. >>>>>>> >>>>>>> To your second question: >>>>>>> >>>>>>> The issue is that if you build up pushdowns incrementally and not >>>>>>> all at once, you end up having to reject pushdowns and filters that you >>>>>>> actually can do, which unnecessarily increases overheads. >>>>>>> >>>>>>> For example, the dataset >>>>>>> >>>>>>> a b c >>>>>>> 1 2 3 >>>>>>> 1 3 3 >>>>>>> 1 3 4 >>>>>>> 2 1 1 >>>>>>> 2 0 1 >>>>>>> >>>>>>> can efficiently push down sort(b, c) if I have already applied the >>>>>>> filter a = 1, but otherwise will force a sort in Spark. On the PR I >>>>>>> detail >>>>>>> a case I see where I can push down two equality filters iff I am given >>>>>>> them >>>>>>> at the same time, whilst not being able to one at a time. >>>>>>> >>>>>>> There are loads of cases like this - you can imagine someone being >>>>>>> able to push down a sort before a filter is applied, but not afterwards. >>>>>>> However, maybe the filter is so selective that it's better to push down >>>>>>> the >>>>>>> filter and not handle the sort. I don't get to make this decision, Spark >>>>>>> does (but doesn't have good enough information to do it properly, >>>>>>> whilst I >>>>>>> do). I want to be able to choose the parts I push down given knowledge >>>>>>> of >>>>>>> my datasource - as defined the APIs don't let me do that, they're >>>>>>> strictly >>>>>>> more restrictive than the V1 APIs in this way. >>>>>>> >>>>>>> The pattern of not considering things that can be done in bulk bites >>>>>>> us in other ways. The retrieval methods end up being trickier to >>>>>>> implement >>>>>>> than is necessary because frequently a single operation provides the >>>>>>> result >>>>>>> of many of the getters, but the state is mutable, so you end up with odd >>>>>>> caches. >>>>>>> >>>>>>> For example, the work I need to do to answer unhandledFilters in V1 >>>>>>> is roughly the same as the work I need to do to buildScan, so I want to >>>>>>> cache it. This means that I end up with code that looks like: >>>>>>> >>>>>>> public final class CachingFoo implements Foo { >>>>>>> private final Foo delegate; >>>>>>> >>>>>>> private List<Filter> currentFilters = emptyList(); >>>>>>> private Supplier<Bar> barSupplier = newSupplier(currentFilters); >>>>>>> >>>>>>> public CachingFoo(Foo delegate) { >>>>>>> this.delegate = delegate; >>>>>>> } >>>>>>> >>>>>>> private Supplier<Bar> newSupplier(List<Filter> filters) { >>>>>>> return Suppliers.memoize(() -> delegate.computeBar(filters)); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public Bar computeBar(List<Filter> filters) { >>>>>>> if (!filters.equals(currentFilters)) { >>>>>>> currentFilters = filters; >>>>>>> barSupplier = newSupplier(filters); >>>>>>> } >>>>>>> >>>>>>> return barSupplier.get(); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> which caches the result required in unhandledFilters on the >>>>>>> expectation that Spark will call buildScan afterwards and get to use the >>>>>>> result.. >>>>>>> >>>>>>> This kind of cache becomes more prominent, but harder to deal with >>>>>>> in the new APIs. As one example here, the state I will need in order to >>>>>>> compute accurate column stats internally will likely be a subset of the >>>>>>> work required in order to get the read tasks, tell you if I can handle >>>>>>> filters, etc, so I'll want to cache them for reuse. However, the cached >>>>>>> information needs to be appropriately invalidated when I add a new >>>>>>> filter >>>>>>> or sort order or limit, and this makes implementing the APIs harder and >>>>>>> more error-prone. >>>>>>> >>>>>>> One thing that'd be great is a defined contract of the order in >>>>>>> which Spark calls the methods on your datasource (ideally this contract >>>>>>> could be implied by the way the Java class structure works, but >>>>>>> otherwise I >>>>>>> can just throw). >>>>>>> >>>>>>> James >>>>>>> >>>>>>> On Tue, 29 Aug 2017 at 02:56 Reynold Xin <r...@databricks.com> >>>>>>> wrote: >>>>>>> >>>>>>>> James, >>>>>>>> >>>>>>>> Thanks for the comment. I think you just pointed out a trade-off >>>>>>>> between expressiveness and API simplicity, compatibility and >>>>>>>> evolvability. >>>>>>>> For the max expressiveness, we'd want the ability to expose full query >>>>>>>> plans, and let the data source decide which part of the query plan can >>>>>>>> be >>>>>>>> pushed down. >>>>>>>> >>>>>>>> The downside to that (full query plan push down) are: >>>>>>>> >>>>>>>> 1. It is extremely difficult to design a stable representation for >>>>>>>> logical / physical plan. It is doable, but we'd be the first to do it. >>>>>>>> I'm >>>>>>>> not sure of any mainstream databases being able to do that in the >>>>>>>> past. The >>>>>>>> design of that API itself, to make sure we have a good story for >>>>>>>> backward >>>>>>>> and forward compatibility, would probably take months if not years. It >>>>>>>> might still be good to do, or offer an experimental trait without >>>>>>>> compatibility guarantee that uses the current Catalyst internal logical >>>>>>>> plan. >>>>>>>> >>>>>>>> 2. Most data source developers simply want a way to offer some >>>>>>>> data, without any pushdown. Having to understand query plans is a >>>>>>>> burden >>>>>>>> rather than a gift. >>>>>>>> >>>>>>>> >>>>>>>> Re: your point about the proposed v2 being worse than v1 for your >>>>>>>> use case. >>>>>>>> >>>>>>>> Can you say more? You used the argument that in v2 there are more >>>>>>>> support for broader pushdown and as a result it is harder to implement. >>>>>>>> That's how it is supposed to be. If a data source simply implements >>>>>>>> one of >>>>>>>> the trait, it'd be logically identical to v1. I don't see why it would >>>>>>>> be >>>>>>>> worse or better, other than v2 provides much stronger forward >>>>>>>> compatibility >>>>>>>> guarantees than v1. >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Aug 29, 2017 at 4:54 AM, James Baker <j.ba...@outlook.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Copying from the code review comments I just submitted on the >>>>>>>>> draft API (https://github.com/cloud-fan/ >>>>>>>>> spark/pull/10#pullrequestreview-59088745): >>>>>>>>> >>>>>>>>> Context here is that I've spent some time implementing a Spark >>>>>>>>> datasource and have had some issues with the current API which are >>>>>>>>> made >>>>>>>>> worse in V2. >>>>>>>>> >>>>>>>>> The general conclusion I’ve come to here is that this is very hard >>>>>>>>> to actually implement (in a similar but more aggressive way than >>>>>>>>> DataSource >>>>>>>>> V1, because of the extra methods and dimensions we get in V2). >>>>>>>>> >>>>>>>>> In DataSources V1 PrunedFilteredScan, the issue is that you are >>>>>>>>> passed in the filters with the buildScan method, and then passed in >>>>>>>>> again >>>>>>>>> with the unhandledFilters method. >>>>>>>>> >>>>>>>>> However, the filters that you can’t handle might be data >>>>>>>>> dependent, which the current API does not handle well. Suppose I can >>>>>>>>> handle >>>>>>>>> filter A some of the time, and filter B some of the time. If I’m >>>>>>>>> passed in >>>>>>>>> both, then either A and B are unhandled, or A, or B, or neither. The >>>>>>>>> work I >>>>>>>>> have to do to work this out is essentially the same as I have to do >>>>>>>>> while >>>>>>>>> actually generating my RDD (essentially I have to generate my >>>>>>>>> partitions), >>>>>>>>> so I end up doing some weird caching work. >>>>>>>>> >>>>>>>>> This V2 API proposal has the same issues, but perhaps moreso. In >>>>>>>>> PrunedFilteredScan, there is essentially one degree of freedom for >>>>>>>>> pruning >>>>>>>>> (filters), so you just have to implement caching between >>>>>>>>> unhandledFilters >>>>>>>>> and buildScan. However, here we have many degrees of freedom; sorts, >>>>>>>>> individual filters, clustering, sampling, maybe aggregations >>>>>>>>> eventually - >>>>>>>>> and these operations are not all commutative, and computing my support >>>>>>>>> one-by-one can easily end up being more expensive than computing all >>>>>>>>> in one >>>>>>>>> go. >>>>>>>>> >>>>>>>>> For some trivial examples: >>>>>>>>> >>>>>>>>> - After filtering, I might be sorted, whilst before filtering I >>>>>>>>> might not be. >>>>>>>>> >>>>>>>>> - Filtering with certain filters might affect my ability to push >>>>>>>>> down others. >>>>>>>>> >>>>>>>>> - Filtering with aggregations (as mooted) might not be possible to >>>>>>>>> push down. >>>>>>>>> >>>>>>>>> And with the API as currently mooted, I need to be able to go back >>>>>>>>> and change my results because they might change later. >>>>>>>>> >>>>>>>>> Really what would be good here is to pass all of the filters and >>>>>>>>> sorts etc all at once, and then I return the parts I can’t handle. >>>>>>>>> >>>>>>>>> I’d prefer in general that this be implemented by passing some >>>>>>>>> kind of query plan to the datasource which enables this kind of >>>>>>>>> replacement. Explicitly don’t want to give the whole query plan - that >>>>>>>>> sounds painful - would prefer we push down only the parts of the >>>>>>>>> query plan >>>>>>>>> we deem to be stable. With the mix-in approach, I don’t think we can >>>>>>>>> guarantee the properties we want without a two-phase thing - I’d >>>>>>>>> really >>>>>>>>> love to be able to just define a straightforward union type which is >>>>>>>>> our >>>>>>>>> supported pushdown stuff, and then the user can transform and return >>>>>>>>> it. >>>>>>>>> >>>>>>>>> I think this ends up being a more elegant API for consumers, and >>>>>>>>> also far more intuitive. >>>>>>>>> >>>>>>>>> James >>>>>>>>> >>>>>>>>> On Mon, 28 Aug 2017 at 18:00 蒋星博 <jiangxb1...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> +1 (Non-binding) >>>>>>>>>> >>>>>>>>>> Xiao Li <gatorsm...@gmail.com>于2017年8月28日 周一下午5:38写道: >>>>>>>>>> >>>>>>>>>>> +1 >>>>>>>>>>> >>>>>>>>>>> 2017-08-28 12:45 GMT-07:00 Cody Koeninger <c...@koeninger.org>: >>>>>>>>>>> >>>>>>>>>>>> Just wanted to point out that because the jira isn't labeled >>>>>>>>>>>> SPIP, it >>>>>>>>>>>> won't have shown up linked from >>>>>>>>>>>> >>>>>>>>>>>> http://spark.apache.org/improvement-proposals.html >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan < >>>>>>>>>>>> cloud0...@gmail.com> wrote: >>>>>>>>>>>> > Hi all, >>>>>>>>>>>> > >>>>>>>>>>>> > It has been almost 2 weeks since I proposed the data source >>>>>>>>>>>> V2 for >>>>>>>>>>>> > discussion, and we already got some feedbacks on the JIRA >>>>>>>>>>>> ticket and the >>>>>>>>>>>> > prototype PR, so I'd like to call for a vote. >>>>>>>>>>>> > >>>>>>>>>>>> > The full document of the Data Source API V2 is: >>>>>>>>>>>> > https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ >>>>>>>>>>>> -Z8qU5Frf6WMQZ6jJVM/edit >>>>>>>>>>>> > >>>>>>>>>>>> > Note that, this vote should focus on high-level >>>>>>>>>>>> design/framework, not >>>>>>>>>>>> > specified APIs, as we can always change/improve specified >>>>>>>>>>>> APIs during >>>>>>>>>>>> > development. >>>>>>>>>>>> > >>>>>>>>>>>> > The vote will be up for the next 72 hours. Please reply with >>>>>>>>>>>> your vote: >>>>>>>>>>>> > >>>>>>>>>>>> > +1: Yeah, let's go forward and implement the SPIP. >>>>>>>>>>>> > +0: Don't really care. >>>>>>>>>>>> > -1: I don't think this is a good idea because of the >>>>>>>>>>>> following technical >>>>>>>>>>>> > reasons. >>>>>>>>>>>> > >>>>>>>>>>>> > Thanks! >>>>>>>>>>>> >>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>> --------- >>>>>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>> >>>> >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > > -- Ryan Blue Software Engineer Netflix