OK I agree with it, how about we add a new interface to push down the query
plan, based on the current framework? We can mark the query-plan-push-down
interface as unstable, to save the effort of designing a stable
representation of query plan and maintaining forward compatibility.

On Wed, Aug 30, 2017 at 10:53 AM, James Baker <j.ba...@outlook.com> wrote:

> I'll just focus on the one-by-one thing for now - it's the thing that
> blocks me the most.
>
> I think the place where we're most confused here is on the cost of
> determining whether I can push down a filter. For me, in order to work out
> whether I can push down a filter or satisfy a sort, I might have to read
> plenty of data. That said, it's worth me doing this because I can use this
> information to avoid reading >>that much data.
>
> If you give me all the orderings, I will have to read that data many times
> (we stream it to avoid keeping it in memory).
>
> There's also a thing where our typical use cases have many filters (20+ is
> common). So, it's likely not going to work to pass us all the combinations.
> That said, if I can tell you a cost, I know what optimal looks like, why
> can't I just pick that myself?
>
> The current design is friendly to simple datasources, but does not have
> the potential to support this.
>
> So the main problem we have with datasources v1 is that it's essentially
> impossible to leverage a bunch of Spark features - I don't get to use
> bucketing or row batches or all the nice things that I really want to use
> to get decent performance. Provided I can leverage these in a moderately
> supported way which won't break in any given commit, I'll be pretty happy
> with anything that lets me opt out of the restrictions.
>
> My suggestion here is that if you make a mode which works well for
> complicated use cases, you end up being able to write simple mode in terms
> of it very easily. So we could actually provide two APIs, one that lets
> people who have more interesting datasources leverage the cool Spark
> features, and one that lets people who just want to implement basic
> features do that - I'd try to include some kind of layering here. I could
> probably sketch out something here if that'd be useful?
>
> James
>
> On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <cloud0...@gmail.com> wrote:
>
>> Hi James,
>>
>> Thanks for your feedback! I think your concerns are all valid, but we
>> need to make a tradeoff here.
>>
>> > Explicitly here, what I'm looking for is a convenient mechanism to
>> accept a fully specified set of arguments
>>
>> The problem with this approach is: 1) if we wanna add more arguments in
>> the future, it's really hard to do without changing the existing interface.
>> 2) if a user wants to implement a very simple data source, he has to look
>> at all the arguments and understand them, which may be a burden for him.
>> I don't have a solution to solve these 2 problems, comments are welcome.
>>
>>
>> > There are loads of cases like this - you can imagine someone being
>> able to push down a sort before a filter is applied, but not afterwards.
>> However, maybe the filter is so selective that it's better to push down the
>> filter and not handle the sort. I don't get to make this decision, Spark
>> does (but doesn't have good enough information to do it properly, whilst I
>> do). I want to be able to choose the parts I push down given knowledge of
>> my datasource - as defined the APIs don't let me do that, they're strictly
>> more restrictive than the V1 APIs in this way.
>>
>> This is true, the current framework applies push downs one by one,
>> incrementally. If a data source wanna go back to accept a sort push down
>> after it accepts a filter push down, it's impossible with the current data
>> source V2.
>> Fortunately, we have a solution for this problem. At Spark side, actually
>> we do have a fully specified set of arguments waiting to be pushed down,
>> but Spark doesn't know which is the best order to push them into data
>> source. Spark can try every combination and ask the data source to report a
>> cost, then Spark can pick the best combination with the lowest cost. This
>> can also be implemented as a cost report interface, so that advanced data
>> source can implement it for optimal performance, and simple data source
>> doesn't need to care about it and keep simple.
>>
>>
>> The current design is very friendly to simple data source, and has the
>> potential to support complex data source, I prefer the current design over
>> the plan push down one. What do you think?
>>
>>
>> On Wed, Aug 30, 2017 at 5:53 AM, James Baker <j.ba...@outlook.com> wrote:
>>
>>> Yeah, for sure.
>>>
>>> With the stable representation - agree that in the general case this is
>>> pretty intractable, it restricts the modifications that you can do in the
>>> future too much. That said, it shouldn't be as hard if you restrict
>>> yourself to the parts of the plan which are supported by the datasources V2
>>> API (which after all, need to be translateable properly into the future to
>>> support the mixins proposed). This should have a pretty small scope in
>>> comparison. As long as the user can bail out of nodes they don't
>>> understand, they should be ok, right?
>>>
>>> That said, what would also be fine for us is a place to plug into an
>>> unstable query plan.
>>>
>>> Explicitly here, what I'm looking for is a convenient mechanism to
>>> accept a fully specified set of arguments (of which I can choose to ignore
>>> some), and return the information as to which of them I'm ignoring. Taking
>>> a query plan of sorts is a way of doing this which IMO is intuitive to the
>>> user. It also provides a convenient location to plug in things like stats.
>>> Not at all married to the idea of using a query plan here; it just seemed
>>> convenient.
>>>
>>> Regarding the users who just want to be able to pump data into Spark, my
>>> understanding is that replacing isolated nodes in a query plan is easy.
>>> That said, our goal here is to be able to push down as much as possible
>>> into the underlying datastore.
>>>
>>> To your second question:
>>>
>>> The issue is that if you build up pushdowns incrementally and not all at
>>> once, you end up having to reject pushdowns and filters that you actually
>>> can do, which unnecessarily increases overheads.
>>>
>>> For example, the dataset
>>>
>>> a b c
>>> 1 2 3
>>> 1 3 3
>>> 1 3 4
>>> 2 1 1
>>> 2 0 1
>>>
>>> can efficiently push down sort(b, c) if I have already applied the
>>> filter a = 1, but otherwise will force a sort in Spark. On the PR I detail
>>> a case I see where I can push down two equality filters iff I am given them
>>> at the same time, whilst not being able to one at a time.
>>>
>>> There are loads of cases like this - you can imagine someone being able
>>> to push down a sort before a filter is applied, but not afterwards.
>>> However, maybe the filter is so selective that it's better to push down the
>>> filter and not handle the sort. I don't get to make this decision, Spark
>>> does (but doesn't have good enough information to do it properly, whilst I
>>> do). I want to be able to choose the parts I push down given knowledge of
>>> my datasource - as defined the APIs don't let me do that, they're strictly
>>> more restrictive than the V1 APIs in this way.
>>>
>>> The pattern of not considering things that can be done in bulk bites us
>>> in other ways. The retrieval methods end up being trickier to implement
>>> than is necessary because frequently a single operation provides the result
>>> of many of the getters, but the state is mutable, so you end up with odd
>>> caches.
>>>
>>> For example, the work I need to do to answer unhandledFilters in V1 is
>>> roughly the same as the work I need to do to buildScan, so I want to cache
>>> it. This means that I end up with code that looks like:
>>>
>>> public final class CachingFoo implements Foo {
>>>     private final Foo delegate;
>>>
>>>     private List<Filter> currentFilters = emptyList();
>>>     private Supplier<Bar> barSupplier = newSupplier(currentFilters);
>>>
>>>     public CachingFoo(Foo delegate) {
>>>         this.delegate = delegate;
>>>     }
>>>
>>>     private Supplier<Bar> newSupplier(List<Filter> filters) {
>>>         return Suppliers.memoize(() -> delegate.computeBar(filters));
>>>     }
>>>
>>>     @Override
>>>     public Bar computeBar(List<Filter> filters) {
>>>         if (!filters.equals(currentFilters)) {
>>>             currentFilters = filters;
>>>             barSupplier = newSupplier(filters);
>>>         }
>>>
>>>         return barSupplier.get();
>>>     }
>>> }
>>>
>>> which caches the result required in unhandledFilters on the expectation
>>> that Spark will call buildScan afterwards and get to use the result..
>>>
>>> This kind of cache becomes more prominent, but harder to deal with in
>>> the new APIs. As one example here, the state I will need in order to
>>> compute accurate column stats internally will likely be a subset of the
>>> work required in order to get the read tasks, tell you if I can handle
>>> filters, etc, so I'll want to cache them for reuse. However, the cached
>>> information needs to be appropriately invalidated when I add a new filter
>>> or sort order or limit, and this makes implementing the APIs harder and
>>> more error-prone.
>>>
>>> One thing that'd be great is a defined contract of the order in which
>>> Spark calls the methods on your datasource (ideally this contract could be
>>> implied by the way the Java class structure works, but otherwise I can just
>>> throw).
>>>
>>> James
>>>
>>> On Tue, 29 Aug 2017 at 02:56 Reynold Xin <r...@databricks.com> wrote:
>>>
>>>> James,
>>>>
>>>> Thanks for the comment. I think you just pointed out a trade-off
>>>> between expressiveness and API simplicity, compatibility and evolvability.
>>>> For the max expressiveness, we'd want the ability to expose full query
>>>> plans, and let the data source decide which part of the query plan can be
>>>> pushed down.
>>>>
>>>> The downside to that (full query plan push down) are:
>>>>
>>>> 1. It is extremely difficult to design a stable representation for
>>>> logical / physical plan. It is doable, but we'd be the first to do it. I'm
>>>> not sure of any mainstream databases being able to do that in the past. The
>>>> design of that API itself, to make sure we have a good story for backward
>>>> and forward compatibility, would probably take months if not years. It
>>>> might still be good to do, or offer an experimental trait without
>>>> compatibility guarantee that uses the current Catalyst internal logical
>>>> plan.
>>>>
>>>> 2. Most data source developers simply want a way to offer some data,
>>>> without any pushdown. Having to understand query plans is a burden rather
>>>> than a gift.
>>>>
>>>>
>>>> Re: your point about the proposed v2 being worse than v1 for your use
>>>> case.
>>>>
>>>> Can you say more? You used the argument that in v2 there are more
>>>> support for broader pushdown and as a result it is harder to implement.
>>>> That's how it is supposed to be. If a data source simply implements one of
>>>> the trait, it'd be logically identical to v1. I don't see why it would be
>>>> worse or better, other than v2 provides much stronger forward compatibility
>>>> guarantees than v1.
>>>>
>>>>
>>>> On Tue, Aug 29, 2017 at 4:54 AM, James Baker <j.ba...@outlook.com>
>>>> wrote:
>>>>
>>>>> Copying from the code review comments I just submitted on the draft
>>>>> API (https://github.com/cloud-fan/spark/pull/10#
>>>>> pullrequestreview-59088745):
>>>>>
>>>>> Context here is that I've spent some time implementing a Spark
>>>>> datasource and have had some issues with the current API which are made
>>>>> worse in V2.
>>>>>
>>>>> The general conclusion I’ve come to here is that this is very hard to
>>>>> actually implement (in a similar but more aggressive way than DataSource
>>>>> V1, because of the extra methods and dimensions we get in V2).
>>>>>
>>>>> In DataSources V1 PrunedFilteredScan, the issue is that you are passed
>>>>> in the filters with the buildScan method, and then passed in again with 
>>>>> the
>>>>> unhandledFilters method.
>>>>>
>>>>> However, the filters that you can’t handle might be data dependent,
>>>>> which the current API does not handle well. Suppose I can handle filter A
>>>>> some of the time, and filter B some of the time. If I’m passed in both,
>>>>> then either A and B are unhandled, or A, or B, or neither. The work I have
>>>>> to do to work this out is essentially the same as I have to do while
>>>>> actually generating my RDD (essentially I have to generate my partitions),
>>>>> so I end up doing some weird caching work.
>>>>>
>>>>> This V2 API proposal has the same issues, but perhaps moreso. In
>>>>> PrunedFilteredScan, there is essentially one degree of freedom for pruning
>>>>> (filters), so you just have to implement caching between unhandledFilters
>>>>> and buildScan. However, here we have many degrees of freedom; sorts,
>>>>> individual filters, clustering, sampling, maybe aggregations eventually -
>>>>> and these operations are not all commutative, and computing my support
>>>>> one-by-one can easily end up being more expensive than computing all in 
>>>>> one
>>>>> go.
>>>>>
>>>>> For some trivial examples:
>>>>>
>>>>> - After filtering, I might be sorted, whilst before filtering I might
>>>>> not be.
>>>>>
>>>>> - Filtering with certain filters might affect my ability to push down
>>>>> others.
>>>>>
>>>>> - Filtering with aggregations (as mooted) might not be possible to
>>>>> push down.
>>>>>
>>>>> And with the API as currently mooted, I need to be able to go back and
>>>>> change my results because they might change later.
>>>>>
>>>>> Really what would be good here is to pass all of the filters and sorts
>>>>> etc all at once, and then I return the parts I can’t handle.
>>>>>
>>>>> I’d prefer in general that this be implemented by passing some kind of
>>>>> query plan to the datasource which enables this kind of replacement.
>>>>> Explicitly don’t want to give the whole query plan - that sounds painful -
>>>>> would prefer we push down only the parts of the query plan we deem to be
>>>>> stable. With the mix-in approach, I don’t think we can guarantee the
>>>>> properties we want without a two-phase thing - I’d really love to be able
>>>>> to just define a straightforward union type which is our supported 
>>>>> pushdown
>>>>> stuff, and then the user can transform and return it.
>>>>>
>>>>> I think this ends up being a more elegant API for consumers, and also
>>>>> far more intuitive.
>>>>>
>>>>> James
>>>>>
>>>>> On Mon, 28 Aug 2017 at 18:00 蒋星博 <jiangxb1...@gmail.com> wrote:
>>>>>
>>>>>> +1 (Non-binding)
>>>>>>
>>>>>> Xiao Li <gatorsm...@gmail.com>于2017年8月28日 周一下午5:38写道:
>>>>>>
>>>>>>> +1
>>>>>>>
>>>>>>> 2017-08-28 12:45 GMT-07:00 Cody Koeninger <c...@koeninger.org>:
>>>>>>>
>>>>>>>> Just wanted to point out that because the jira isn't labeled SPIP,
>>>>>>>> it
>>>>>>>> won't have shown up linked from
>>>>>>>>
>>>>>>>> http://spark.apache.org/improvement-proposals.html
>>>>>>>>
>>>>>>>> On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan <cloud0...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> > Hi all,
>>>>>>>> >
>>>>>>>> > It has been almost 2 weeks since I proposed the data source V2 for
>>>>>>>> > discussion, and we already got some feedbacks on the JIRA ticket
>>>>>>>> and the
>>>>>>>> > prototype PR, so I'd like to call for a vote.
>>>>>>>> >
>>>>>>>> > The full document of the Data Source API V2 is:
>>>>>>>> > https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-
>>>>>>>> Z8qU5Frf6WMQZ6jJVM/edit
>>>>>>>> >
>>>>>>>> > Note that, this vote should focus on high-level design/framework,
>>>>>>>> not
>>>>>>>> > specified APIs, as we can always change/improve specified APIs
>>>>>>>> during
>>>>>>>> > development.
>>>>>>>> >
>>>>>>>> > The vote will be up for the next 72 hours. Please reply with your
>>>>>>>> vote:
>>>>>>>> >
>>>>>>>> > +1: Yeah, let's go forward and implement the SPIP.
>>>>>>>> > +0: Don't really care.
>>>>>>>> > -1: I don't think this is a good idea because of the following
>>>>>>>> technical
>>>>>>>> > reasons.
>>>>>>>> >
>>>>>>>> > Thanks!
>>>>>>>>
>>>>>>>> ------------------------------------------------------------
>>>>>>>> ---------
>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>>

Reply via email to