Cassandra can insert records with the same partition-key faster if they
arrive in the same payload. But this is only beneficial if the incoming
dataset has multiple entries for the same partition key.

Thanks for the example, the recommended partitioning use case makes more
sense now. I think we could have two interfaces, a RequiresClustering and a
RecommendsClustering if we want to support this. But I’m skeptical it will
be useful for two reasons:

   - Do we want to optimize the low cardinality case? Shuffles are usually
   much cheaper at smaller sizes, so I’m not sure it is necessary to optimize
   this away.
   - How do we know there isn’t just a few partition keys for all the
   records? It may look like a shuffle wouldn’t help, but we don’t know the
   partition keys until it is too late.

Then there’s also the logic for avoiding the shuffle and how to calculate
the cost, which sounds like something that needs some details from CBO.

I would assume that given the estimated data size from Spark and options
passed in from the user, the data source could make a more intelligent
requirement on the write format than Spark independently.

This is a good point.

What would an implementation actually do here and how would information be
passed? For my use cases, the store would produce the number of tasks based
on the estimated incoming rows, because the source has the best idea of how
the rows will compress. But, that’s just applying a multiplier most of the
time. To be very useful, this would have to handle skew in the rows (think
row with a type where total size depends on type) and that’s a bit harder.
I think maybe an interface that can provide relative cost estimates based
on partition keys would be helpful, but then keep the planning logic in
Spark.

This is probably something that we could add later as we find use cases
that require it?

I wouldn’t assume that a data source requiring a certain write format would
give any guarantees around reading the same data? In the cases where it is
a complete overwrite it would, but for independent writes it could still be
useful for statistics or compression.

Right, you could use this to store a global ordering if there is only one
write (e.g., CTAS). I don’t think anything needs to change in that case,
you would still have a clustering and an ordering, but the ordering would
need to include all fields of the clustering. A way to pass in the
partition ordinal for the source to store would be required.

For the second point that ordering is useful for statistics and
compression, I completely agree. Our best practices doc tells users to
always add a global sort when writing because you get the benefit of a
range partitioner to handle skew, plus the stats and compression you’re
talking about to optimize for reads. I think the proposed API can request a
global ordering from Spark already. My only point is that there isn’t much
the source can do to guarantee ordering for reads when there is more than
one write.
​

On Wed, Mar 28, 2018 at 7:14 PM, Patrick Woody <patrick.woo...@gmail.com>
wrote:

> Spark would always apply the required clustering and sort order because
>> they are required by the data source. It is reasonable for a source to
>> reject data that isn’t properly prepared. For example, data must be written
>> to HTable files with keys in order or else the files are invalid. Sorting
>> should not be implemented in the sources themselves because Spark handles
>> concerns like spilling to disk. Spark must prepare data correctly, which is
>> why the interfaces start with “Requires”.
>
>
> This was in reference to Russell's suggestion that the data source could
> have a required sort, but only a recommended partitioning. I don't have an
> immediate recommending use case that would come to mind though. I'm
> definitely in sync that the data source itself shouldn't do work outside of
> the writes themselves.
>
> Considering the second use case you mentioned first, I don’t think it is a
>> good idea for a table to put requirements on the number of tasks used for a
>> write. The parallelism should be set appropriately for the data volume,
>> which is for Spark or the user to determine. A minimum or maximum number of
>> tasks could cause bad behavior.
>
>
> For your first use case, an explicit global ordering, the problem is that
>> there can’t be an explicit global ordering for a table when it is populated
>> by a series of independent writes. Each write could have a global order,
>> but once those files are written, you have to deal with multiple sorted
>> data sets. I think it makes sense to focus on order within data files, not
>> order between data files.
>
>
> This is where I'm interested in learning about the separation of
> responsibilities for the data source and how "smart" it is supposed to be.
>
> For the first part, I would assume that given the estimated data size from
> Spark and options passed in from the user, the data source could make a
> more intelligent requirement on the write format than Spark independently.
> Somewhat analogous to how the current FileSource does bin packing of small
> files on the read side, restricting parallelism for the sake of overhead.
>
> For the second, I wouldn't assume that a data source requiring a certain
> write format would give any guarantees around reading the same data? In the
> cases where it is a complete overwrite it would, but for independent writes
> it could still be useful for statistics or compression.
>
> Thanks
> Pat
>
>
>
> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com> wrote:
>
>> How would Spark determine whether or not to apply a recommendation - a
>> cost threshold?
>>
>> Spark would always apply the required clustering and sort order because
>> they are required by the data source. It is reasonable for a source to
>> reject data that isn’t properly prepared. For example, data must be written
>> to HTable files with keys in order or else the files are invalid. Sorting
>> should not be implemented in the sources themselves because Spark handles
>> concerns like spilling to disk. Spark must prepare data correctly, which is
>> why the interfaces start with “Requires”.
>>
>> I’m not sure what the second half of your question means. What does Spark
>> need to pass into the data source?
>>
>> Should a datasource be able to provide a Distribution proper rather than
>> just the clustering expressions? Two use cases would be for explicit global
>> sorting of the dataset and attempting to ensure a minimum write task
>> size/number of write tasks.
>>
>> Considering the second use case you mentioned first, I don’t think it is
>> a good idea for a table to put requirements on the number of tasks used for
>> a write. The parallelism should be set appropriately for the data volume,
>> which is for Spark or the user to determine. A minimum or maximum number of
>> tasks could cause bad behavior.
>>
>> That said, I think there is a related use case for sharding. But that’s
>> really just a clustering by an expression with the shard calculation, e.g., 
>> hash(id_col,
>> 64). The shards should be handled as a cluster, but it doesn’t matter
>> how many tasks are used for it.
>>
>> For your first use case, an explicit global ordering, the problem is that
>> there can’t be an explicit global ordering for a table when it is populated
>> by a series of independent writes. Each write could have a global order,
>> but once those files are written, you have to deal with multiple sorted
>> data sets. I think it makes sense to focus on order within data files, not
>> order between data files.
>> ​
>>
>> On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody <patrick.woo...@gmail.com>
>> wrote:
>>
>>> How would Spark determine whether or not to apply a recommendation - a
>>> cost threshold? And yes, it would be good to flesh out what information we
>>> get from Spark in the datasource when providing these
>>> recommendations/requirements - I could see statistics and the existing
>>> outputPartitioning/Ordering of the child plan being used for providing the
>>> requirement.
>>>
>>> Should a datasource be able to provide a Distribution proper rather than
>>> just the clustering expressions? Two use cases would be for explicit global
>>> sorting of the dataset and attempting to ensure a minimum write task
>>> size/number of write tasks.
>>>
>>>
>>>
>>> On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <
>>> russell.spit...@gmail.com> wrote:
>>>
>>>> Thanks for the clarification, definitely would want to require Sort but
>>>> only recommend partitioning ...  I think that would be useful to request
>>>> based on details about the incoming dataset.
>>>>
>>>> On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>>> A required clustering would not, but a required sort would. Clustering
>>>>> is asking for the input dataframe's partitioning, and sorting would be how
>>>>> each partition is sorted.
>>>>>
>>>>> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <
>>>>> russell.spit...@gmail.com> wrote:
>>>>>
>>>>>> I forgot since it's been a while, but does Clustering support allow
>>>>>> requesting that partitions contain elements in order as well? That would 
>>>>>> be
>>>>>> a useful trick for me. IE
>>>>>> Request/Require(SortedOn(Col1))
>>>>>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
>>>>>>
>>>>>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks, it makes sense that the existing interface is for
>>>>>>> aggregation and not joins. Why are there requirements for the number of
>>>>>>> partitions that are returned then?
>>>>>>>
>>>>>>> Does it makes sense to design the write-side `Requirement` classes
>>>>>>> and the read-side reporting separately?
>>>>>>>
>>>>>>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
>>>>>>>> expose hash function, so Join can't benefit from this interface, as 
>>>>>>>> Join
>>>>>>>> doesn't require a general ClusteredDistribution, but a more specific 
>>>>>>>> one
>>>>>>>> called HashClusteredDistribution.
>>>>>>>>
>>>>>>>> So currently only Aggregate can benefit from
>>>>>>>> SupportsReportPartitioning and save shuffle. We can add a new 
>>>>>>>> interface to
>>>>>>>> expose the hash function to make it work for Join.
>>>>>>>>
>>>>>>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I just took a look at SupportsReportPartitioning and I'm not sure
>>>>>>>>> that it will work for real use cases. It doesn't specify, as far as I 
>>>>>>>>> can
>>>>>>>>> tell, a hash function for combining clusters into tasks or a way to 
>>>>>>>>> provide
>>>>>>>>> Spark a hash function for the other side of a join. It seems unlikely 
>>>>>>>>> to me
>>>>>>>>> that many data sources would have partitioning that happens to match 
>>>>>>>>> the
>>>>>>>>> other side of a join. And, it looks like task order matters? Maybe I'm
>>>>>>>>> missing something?
>>>>>>>>>
>>>>>>>>> I think that we should design the write side independently based
>>>>>>>>> on what data stores actually need, and take a look at the read side 
>>>>>>>>> based
>>>>>>>>> on what data stores can actually provide. Wenchen, was there a design 
>>>>>>>>> doc
>>>>>>>>> for partitioning on the read path?
>>>>>>>>>
>>>>>>>>> I completely agree with your point about a global sort. We
>>>>>>>>> recommend to all of our data engineers to add a sort to most tables 
>>>>>>>>> because
>>>>>>>>> it introduces the range partitioner and does a skew calculation, in
>>>>>>>>> addition to making data filtering much better when it is read. It's 
>>>>>>>>> really
>>>>>>>>> common for tables to be skewed by partition values.
>>>>>>>>>
>>>>>>>>> rb
>>>>>>>>>
>>>>>>>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <
>>>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Ryan, Ted, Wenchen
>>>>>>>>>>
>>>>>>>>>> Thanks for the quick replies.
>>>>>>>>>>
>>>>>>>>>> @Ryan - the sorting portion makes sense, but I think we'd have to
>>>>>>>>>> ensure something similar to requiredChildDistribution in SparkPlan 
>>>>>>>>>> where we
>>>>>>>>>> have the number of partitions as well if we'd want to further report 
>>>>>>>>>> to
>>>>>>>>>> SupportsReportPartitioning, yeah?
>>>>>>>>>>
>>>>>>>>>> Specifying an explicit global sort can also be useful for
>>>>>>>>>> filtering purposes on Parquet row group stats if we have a time 
>>>>>>>>>> based/high
>>>>>>>>>> cardinality ID field. If my datasource or catalog knows about 
>>>>>>>>>> previous
>>>>>>>>>> queries on a table, it could be really useful to recommend more 
>>>>>>>>>> appropriate
>>>>>>>>>> formatting for consumers on the next materialization. The same would 
>>>>>>>>>> be
>>>>>>>>>> true of clustering on commonly joined fields.
>>>>>>>>>>
>>>>>>>>>> Thanks again
>>>>>>>>>> Pat
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hmm. Ryan seems to be right.
>>>>>>>>>>>
>>>>>>>>>>> Looking at sql/core/src/main/java/org/
>>>>>>>>>>> apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
>>>>>>>>>>> :
>>>>>>>>>>>
>>>>>>>>>>> import org.apache.spark.sql.sources.v
>>>>>>>>>>> 2.reader.partitioning.Partitioning;
>>>>>>>>>>> ...
>>>>>>>>>>>   Partitioning outputPartitioning();
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <
>>>>>>>>>>> cloud0...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Actually clustering is already supported, please take a look at
>>>>>>>>>>>> SupportsReportPartitioning
>>>>>>>>>>>>
>>>>>>>>>>>> Ordering is not proposed yet, might be similar to what Ryan
>>>>>>>>>>>> proposed.
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Interesting.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Should requiredClustering return a Set of Expression's ?
>>>>>>>>>>>>> This way, we can determine the order of Expression's by
>>>>>>>>>>>>> looking at what requiredOrdering() returns.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <
>>>>>>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Pat,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for starting the discussion on this, we’re really
>>>>>>>>>>>>>> interested in it as well. I don’t think there is a proposed API 
>>>>>>>>>>>>>> yet, but I
>>>>>>>>>>>>>> was thinking something like this:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> interface RequiresClustering {
>>>>>>>>>>>>>>   List<Expression> requiredClustering();
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> interface RequiresSort {
>>>>>>>>>>>>>>   List<SortOrder> requiredOrdering();
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The reason why RequiresClustering should provide Expression
>>>>>>>>>>>>>> is that it needs to be able to customize the implementation. For 
>>>>>>>>>>>>>> example,
>>>>>>>>>>>>>> writing to HTable would require building a key (or the data for 
>>>>>>>>>>>>>> a key) and
>>>>>>>>>>>>>> that might use a hash function that differs from Spark’s 
>>>>>>>>>>>>>> built-ins.
>>>>>>>>>>>>>> RequiresSort is fairly straightforward, but the interaction
>>>>>>>>>>>>>> between the two requirements deserves some consideration. To 
>>>>>>>>>>>>>> make the two
>>>>>>>>>>>>>> compatible, I think that RequiresSort must be interpreted as
>>>>>>>>>>>>>> a sort within each partition of the clustering, but could 
>>>>>>>>>>>>>> possibly be used
>>>>>>>>>>>>>> for a global sort when the two overlap.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For example, if I have a table partitioned by “day” and
>>>>>>>>>>>>>> “category” then the RequiredClustering would be by day,
>>>>>>>>>>>>>> category. A required sort might be day ASC, category DESC,
>>>>>>>>>>>>>> name ASC. Because that sort satisfies the required
>>>>>>>>>>>>>> clustering, it could be used for a global ordering. But, is that 
>>>>>>>>>>>>>> useful?
>>>>>>>>>>>>>> How would the global ordering matter beyond a sort within each 
>>>>>>>>>>>>>> partition,
>>>>>>>>>>>>>> i.e., how would the partition’s place in the global ordering be 
>>>>>>>>>>>>>> passed?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> To your other questions, you might want to have a look at the
>>>>>>>>>>>>>> recent SPIP I’m working on to consolidate and clean up
>>>>>>>>>>>>>> logical plans
>>>>>>>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
>>>>>>>>>>>>>> That proposes more specific uses for the DataSourceV2 API that 
>>>>>>>>>>>>>> should help
>>>>>>>>>>>>>> clarify what validation needs to take place. As for custom 
>>>>>>>>>>>>>> catalyst rules,
>>>>>>>>>>>>>> I’d like to hear about the use cases to see if we can build it 
>>>>>>>>>>>>>> into these
>>>>>>>>>>>>>> improvements.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> rb
>>>>>>>>>>>>>> ​
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
>>>>>>>>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hey all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I saw in some of the discussions around DataSourceV2 writes
>>>>>>>>>>>>>>> that we might have the data source inform Spark of requirements 
>>>>>>>>>>>>>>> for the
>>>>>>>>>>>>>>> input data's ordering and partitioning. Has there been a 
>>>>>>>>>>>>>>> proposed API for
>>>>>>>>>>>>>>> that yet?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Even one level up it would be helpful to understand how I
>>>>>>>>>>>>>>> should be thinking about the responsibility of the data source 
>>>>>>>>>>>>>>> writer, when
>>>>>>>>>>>>>>> I should be inserting a custom catalyst rule, and how I should 
>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>> validation/assumptions of the table before attempting the write.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>> Pat
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>>>> Netflix
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Software Engineer
>>>>>>>>> Netflix
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to