Ah yeah sorry I got a bit mixed up.

On Wed, Mar 28, 2018 at 7:54 PM Ted Yu <yuzhih...@gmail.com> wrote:

> bq. this shuffle could outweigh the benefits of the organized data if the
> cardinality is lower.
>
> I wonder if you meant higher in place of the last word above.
>
> Cheers
>
> On Wed, Mar 28, 2018 at 7:50 PM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> For added color, one thing that I may want to consider as a data source
>> implementer is the cost / benefit of applying a particular clustering. For
>> example, a dataset with low cardinality in the clustering key could benefit
>> greatly from clustering on that key before writing to Cassandra since
>> Cassandra can benefit from these sorts of batching. But the cost of
>> performing this shuffle could outweigh the benefits of the organized data
>> if the cardinality is lower.
>>
>> I imagine other sources might have similar benefit calculations. Doing a
>> particular sort or clustering can provide increased throughput but only in
>> certain situations based on some facts about the data.
>>
>>
>> For a concrete example here.
>>
>> Cassandra can insert records with the same partition-key faster if they
>> arrive in the same payload. But this is only beneficial if the incoming
>> dataset has multiple entries for the same partition key. If the incoming
>> source does not have any duplicates then there is no benefit to requiring a
>> sort or partitioning.
>>
>> On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody <patrick.woo...@gmail.com>
>> wrote:
>>
>>> Spark would always apply the required clustering and sort order because
>>>> they are required by the data source. It is reasonable for a source to
>>>> reject data that isn’t properly prepared. For example, data must be written
>>>> to HTable files with keys in order or else the files are invalid. Sorting
>>>> should not be implemented in the sources themselves because Spark handles
>>>> concerns like spilling to disk. Spark must prepare data correctly, which is
>>>> why the interfaces start with “Requires”.
>>>
>>>
>>> This was in reference to Russell's suggestion that the data source could
>>> have a required sort, but only a recommended partitioning. I don't have an
>>> immediate recommending use case that would come to mind though. I'm
>>> definitely in sync that the data source itself shouldn't do work outside of
>>> the writes themselves.
>>>
>>>
>>> Considering the second use case you mentioned first, I don’t think it is
>>>> a good idea for a table to put requirements on the number of tasks used for
>>>> a write. The parallelism should be set appropriately for the data volume,
>>>> which is for Spark or the user to determine. A minimum or maximum number of
>>>> tasks could cause bad behavior.
>>>
>>> For your first use case, an explicit global ordering, the problem is
>>>> that there can’t be an explicit global ordering for a table when it is
>>>> populated by a series of independent writes. Each write could have a global
>>>> order, but once those files are written, you have to deal with multiple
>>>> sorted data sets. I think it makes sense to focus on order within data
>>>> files, not order between data files.
>>>
>>>
>>> This is where I'm interested in learning about the separation of
>>> responsibilities for the data source and how "smart" it is supposed to be.
>>>
>>> For the first part, I would assume that given the estimated data size
>>> from Spark and options passed in from the user, the data source could make
>>> a more intelligent requirement on the write format than Spark
>>> independently. Somewhat analogous to how the current FileSource does bin
>>> packing of small files on the read side, restricting parallelism for the
>>> sake of overhead.
>>>
>>> For the second, I wouldn't assume that a data source requiring a certain
>>> write format would give any guarantees around reading the same data? In the
>>> cases where it is a complete overwrite it would, but for independent writes
>>> it could still be useful for statistics or compression.
>>>
>>> Thanks
>>> Pat
>>>
>>>
>>>
>>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> How would Spark determine whether or not to apply a recommendation - a
>>>> cost threshold?
>>>>
>>>> Spark would always apply the required clustering and sort order because
>>>> they are required by the data source. It is reasonable for a source to
>>>> reject data that isn’t properly prepared. For example, data must be written
>>>> to HTable files with keys in order or else the files are invalid. Sorting
>>>> should not be implemented in the sources themselves because Spark handles
>>>> concerns like spilling to disk. Spark must prepare data correctly, which is
>>>> why the interfaces start with “Requires”.
>>>>
>>>> I’m not sure what the second half of your question means. What does
>>>> Spark need to pass into the data source?
>>>>
>>>> Should a datasource be able to provide a Distribution proper rather
>>>> than just the clustering expressions? Two use cases would be for explicit
>>>> global sorting of the dataset and attempting to ensure a minimum write task
>>>> size/number of write tasks.
>>>>
>>>> Considering the second use case you mentioned first, I don’t think it
>>>> is a good idea for a table to put requirements on the number of tasks used
>>>> for a write. The parallelism should be set appropriately for the data
>>>> volume, which is for Spark or the user to determine. A minimum or maximum
>>>> number of tasks could cause bad behavior.
>>>>
>>>> That said, I think there is a related use case for sharding. But that’s
>>>> really just a clustering by an expression with the shard calculation, 
>>>> e.g., hash(id_col,
>>>> 64). The shards should be handled as a cluster, but it doesn’t matter
>>>> how many tasks are used for it.
>>>>
>>>> For your first use case, an explicit global ordering, the problem is
>>>> that there can’t be an explicit global ordering for a table when it is
>>>> populated by a series of independent writes. Each write could have a global
>>>> order, but once those files are written, you have to deal with multiple
>>>> sorted data sets. I think it makes sense to focus on order within data
>>>> files, not order between data files.
>>>> ​
>>>>
>>>> On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody <
>>>> patrick.woo...@gmail.com> wrote:
>>>>
>>>>> How would Spark determine whether or not to apply a recommendation - a
>>>>> cost threshold? And yes, it would be good to flesh out what information we
>>>>> get from Spark in the datasource when providing these
>>>>> recommendations/requirements - I could see statistics and the existing
>>>>> outputPartitioning/Ordering of the child plan being used for providing the
>>>>> requirement.
>>>>>
>>>>> Should a datasource be able to provide a Distribution proper rather
>>>>> than just the clustering expressions? Two use cases would be for explicit
>>>>> global sorting of the dataset and attempting to ensure a minimum write 
>>>>> task
>>>>> size/number of write tasks.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <
>>>>> russell.spit...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the clarification, definitely would want to require Sort
>>>>>> but only recommend partitioning ...  I think that would be useful to
>>>>>> request based on details about the incoming dataset.
>>>>>>
>>>>>> On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>
>>>>>>> A required clustering would not, but a required sort would.
>>>>>>> Clustering is asking for the input dataframe's partitioning, and sorting
>>>>>>> would be how each partition is sorted.
>>>>>>>
>>>>>>> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <
>>>>>>> russell.spit...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I forgot since it's been a while, but does Clustering support allow
>>>>>>>> requesting that partitions contain elements in order as well? That 
>>>>>>>> would be
>>>>>>>> a useful trick for me. IE
>>>>>>>> Request/Require(SortedOn(Col1))
>>>>>>>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
>>>>>>>>
>>>>>>>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue <rb...@netflix.com.invalid>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks, it makes sense that the existing interface is for
>>>>>>>>> aggregation and not joins. Why are there requirements for the number 
>>>>>>>>> of
>>>>>>>>> partitions that are returned then?
>>>>>>>>>
>>>>>>>>> Does it makes sense to design the write-side `Requirement` classes
>>>>>>>>> and the read-side reporting separately?
>>>>>>>>>
>>>>>>>>> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan <cloud0...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Ryan, yea you are right that SupportsReportPartitioning
>>>>>>>>>> doesn't expose hash function, so Join can't benefit from this 
>>>>>>>>>> interface, as
>>>>>>>>>> Join doesn't require a general ClusteredDistribution, but a more 
>>>>>>>>>> specific
>>>>>>>>>> one called HashClusteredDistribution.
>>>>>>>>>>
>>>>>>>>>> So currently only Aggregate can benefit from
>>>>>>>>>> SupportsReportPartitioning and save shuffle. We can add a new 
>>>>>>>>>> interface to
>>>>>>>>>> expose the hash function to make it work for Join.
>>>>>>>>>>
>>>>>>>>>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue <rb...@netflix.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I just took a look at SupportsReportPartitioning and I'm not
>>>>>>>>>>> sure that it will work for real use cases. It doesn't specify, as 
>>>>>>>>>>> far as I
>>>>>>>>>>> can tell, a hash function for combining clusters into tasks or a 
>>>>>>>>>>> way to
>>>>>>>>>>> provide Spark a hash function for the other side of a join. It seems
>>>>>>>>>>> unlikely to me that many data sources would have partitioning that 
>>>>>>>>>>> happens
>>>>>>>>>>> to match the other side of a join. And, it looks like task order 
>>>>>>>>>>> matters?
>>>>>>>>>>> Maybe I'm missing something?
>>>>>>>>>>>
>>>>>>>>>>> I think that we should design the write side independently based
>>>>>>>>>>> on what data stores actually need, and take a look at the read side 
>>>>>>>>>>> based
>>>>>>>>>>> on what data stores can actually provide. Wenchen, was there a 
>>>>>>>>>>> design doc
>>>>>>>>>>> for partitioning on the read path?
>>>>>>>>>>>
>>>>>>>>>>> I completely agree with your point about a global sort. We
>>>>>>>>>>> recommend to all of our data engineers to add a sort to most tables 
>>>>>>>>>>> because
>>>>>>>>>>> it introduces the range partitioner and does a skew calculation, in
>>>>>>>>>>> addition to making data filtering much better when it is read. It's 
>>>>>>>>>>> really
>>>>>>>>>>> common for tables to be skewed by partition values.
>>>>>>>>>>>
>>>>>>>>>>> rb
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <
>>>>>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey Ryan, Ted, Wenchen
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the quick replies.
>>>>>>>>>>>>
>>>>>>>>>>>> @Ryan - the sorting portion makes sense, but I think we'd have
>>>>>>>>>>>> to ensure something similar to requiredChildDistribution in 
>>>>>>>>>>>> SparkPlan where
>>>>>>>>>>>> we have the number of partitions as well if we'd want to further 
>>>>>>>>>>>> report to
>>>>>>>>>>>> SupportsReportPartitioning, yeah?
>>>>>>>>>>>>
>>>>>>>>>>>> Specifying an explicit global sort can also be useful for
>>>>>>>>>>>> filtering purposes on Parquet row group stats if we have a time 
>>>>>>>>>>>> based/high
>>>>>>>>>>>> cardinality ID field. If my datasource or catalog knows about 
>>>>>>>>>>>> previous
>>>>>>>>>>>> queries on a table, it could be really useful to recommend more 
>>>>>>>>>>>> appropriate
>>>>>>>>>>>> formatting for consumers on the next materialization. The same 
>>>>>>>>>>>> would be
>>>>>>>>>>>> true of clustering on commonly joined fields.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks again
>>>>>>>>>>>> Pat
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hmm. Ryan seems to be right.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Looking
>>>>>>>>>>>>> at 
>>>>>>>>>>>>> sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
>>>>>>>>>>>>> :
>>>>>>>>>>>>>
>>>>>>>>>>>>> import
>>>>>>>>>>>>> org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
>>>>>>>>>>>>> ...
>>>>>>>>>>>>>   Partitioning outputPartitioning();
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <
>>>>>>>>>>>>> cloud0...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Actually clustering is already supported, please take a look
>>>>>>>>>>>>>> at SupportsReportPartitioning
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ordering is not proposed yet, might be similar to what Ryan
>>>>>>>>>>>>>> proposed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Interesting.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Should requiredClustering return a Set of Expression's ?
>>>>>>>>>>>>>>> This way, we can determine the order of Expression's by
>>>>>>>>>>>>>>> looking at what requiredOrdering() returns.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <
>>>>>>>>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Pat,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for starting the discussion on this, we’re really
>>>>>>>>>>>>>>>> interested in it as well. I don’t think there is a proposed 
>>>>>>>>>>>>>>>> API yet, but I
>>>>>>>>>>>>>>>> was thinking something like this:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> interface RequiresClustering {
>>>>>>>>>>>>>>>>   List<Expression> requiredClustering();
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> interface RequiresSort {
>>>>>>>>>>>>>>>>   List<SortOrder> requiredOrdering();
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The reason why RequiresClustering should provide Expression
>>>>>>>>>>>>>>>> is that it needs to be able to customize the implementation. 
>>>>>>>>>>>>>>>> For example,
>>>>>>>>>>>>>>>> writing to HTable would require building a key (or the data 
>>>>>>>>>>>>>>>> for a key) and
>>>>>>>>>>>>>>>> that might use a hash function that differs from Spark’s 
>>>>>>>>>>>>>>>> built-ins.
>>>>>>>>>>>>>>>> RequiresSort is fairly straightforward, but the
>>>>>>>>>>>>>>>> interaction between the two requirements deserves some 
>>>>>>>>>>>>>>>> consideration. To
>>>>>>>>>>>>>>>> make the two compatible, I think that RequiresSort must be
>>>>>>>>>>>>>>>> interpreted as a sort within each partition of the clustering, 
>>>>>>>>>>>>>>>> but could
>>>>>>>>>>>>>>>> possibly be used for a global sort when the two overlap.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For example, if I have a table partitioned by “day” and
>>>>>>>>>>>>>>>> “category” then the RequiredClustering would be by day,
>>>>>>>>>>>>>>>> category. A required sort might be day ASC, category DESC,
>>>>>>>>>>>>>>>> name ASC. Because that sort satisfies the required
>>>>>>>>>>>>>>>> clustering, it could be used for a global ordering. But, is 
>>>>>>>>>>>>>>>> that useful?
>>>>>>>>>>>>>>>> How would the global ordering matter beyond a sort within each 
>>>>>>>>>>>>>>>> partition,
>>>>>>>>>>>>>>>> i.e., how would the partition’s place in the global ordering 
>>>>>>>>>>>>>>>> be passed?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> To your other questions, you might want to have a look at
>>>>>>>>>>>>>>>> the recent SPIP I’m working on to consolidate and clean up
>>>>>>>>>>>>>>>> logical plans
>>>>>>>>>>>>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
>>>>>>>>>>>>>>>> That proposes more specific uses for the DataSourceV2 API that 
>>>>>>>>>>>>>>>> should help
>>>>>>>>>>>>>>>> clarify what validation needs to take place. As for custom 
>>>>>>>>>>>>>>>> catalyst rules,
>>>>>>>>>>>>>>>> I’d like to hear about the use cases to see if we can build it 
>>>>>>>>>>>>>>>> into these
>>>>>>>>>>>>>>>> improvements.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> rb
>>>>>>>>>>>>>>>> ​
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
>>>>>>>>>>>>>>>> patrick.woo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hey all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I saw in some of the discussions around DataSourceV2
>>>>>>>>>>>>>>>>> writes that we might have the data source inform Spark of 
>>>>>>>>>>>>>>>>> requirements for
>>>>>>>>>>>>>>>>> the input data's ordering and partitioning. Has there been a 
>>>>>>>>>>>>>>>>> proposed API
>>>>>>>>>>>>>>>>> for that yet?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Even one level up it would be helpful to understand how I
>>>>>>>>>>>>>>>>> should be thinking about the responsibility of the data 
>>>>>>>>>>>>>>>>> source writer, when
>>>>>>>>>>>>>>>>> I should be inserting a custom catalyst rule, and how I 
>>>>>>>>>>>>>>>>> should handle
>>>>>>>>>>>>>>>>> validation/assumptions of the table before attempting the 
>>>>>>>>>>>>>>>>> write.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>> Pat
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Ryan Blue
>>>>>>>>>>>>>>>> Software Engineer
>>>>>>>>>>>>>>>> Netflix
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Ryan Blue
>>>>>>>>>>> Software Engineer
>>>>>>>>>>> Netflix
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Software Engineer
>>>>>>>>> Netflix
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>>
>

Reply via email to