I just took a look at SupportsReportPartitioning and I'm not sure that it
will work for real use cases. It doesn't specify, as far as I can tell, a
hash function for combining clusters into tasks or a way to provide Spark a
hash function for the other side of a join. It seems unlikely to me that
many data sources would have partitioning that happens to match the other
side of a join. And, it looks like task order matters? Maybe I'm missing
something?

I think that we should design the write side independently based on what
data stores actually need, and take a look at the read side based on what
data stores can actually provide. Wenchen, was there a design doc for
partitioning on the read path?

I completely agree with your point about a global sort. We recommend to all
of our data engineers to add a sort to most tables because it introduces
the range partitioner and does a skew calculation, in addition to making
data filtering much better when it is read. It's really common for tables
to be skewed by partition values.

rb

On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <patrick.woo...@gmail.com>
wrote:

> Hey Ryan, Ted, Wenchen
>
> Thanks for the quick replies.
>
> @Ryan - the sorting portion makes sense, but I think we'd have to ensure
> something similar to requiredChildDistribution in SparkPlan where we have
> the number of partitions as well if we'd want to further report to
> SupportsReportPartitioning, yeah?
>
> Specifying an explicit global sort can also be useful for filtering
> purposes on Parquet row group stats if we have a time based/high
> cardinality ID field. If my datasource or catalog knows about previous
> queries on a table, it could be really useful to recommend more appropriate
> formatting for consumers on the next materialization. The same would be
> true of clustering on commonly joined fields.
>
> Thanks again
> Pat
>
>
>
> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Hmm. Ryan seems to be right.
>>
>> Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/re
>> ader/SupportsReportPartitioning.java :
>>
>> import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
>> ...
>>   Partitioning outputPartitioning();
>>
>> On Mon, Mar 26, 2018 at 6:58 PM, Wenchen Fan <cloud0...@gmail.com> wrote:
>>
>>> Actually clustering is already supported, please take a look at
>>> SupportsReportPartitioning
>>>
>>> Ordering is not proposed yet, might be similar to what Ryan proposed.
>>>
>>> On Mon, Mar 26, 2018 at 6:11 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Interesting.
>>>>
>>>> Should requiredClustering return a Set of Expression's ?
>>>> This way, we can determine the order of Expression's by looking at what 
>>>> requiredOrdering()
>>>> returns.
>>>>
>>>> On Mon, Mar 26, 2018 at 5:45 PM, Ryan Blue <rb...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Hi Pat,
>>>>>
>>>>> Thanks for starting the discussion on this, we’re really interested in
>>>>> it as well. I don’t think there is a proposed API yet, but I was thinking
>>>>> something like this:
>>>>>
>>>>> interface RequiresClustering {
>>>>>   List<Expression> requiredClustering();
>>>>> }
>>>>>
>>>>> interface RequiresSort {
>>>>>   List<SortOrder> requiredOrdering();
>>>>> }
>>>>>
>>>>> The reason why RequiresClustering should provide Expression is that
>>>>> it needs to be able to customize the implementation. For example, writing
>>>>> to HTable would require building a key (or the data for a key) and that
>>>>> might use a hash function that differs from Spark’s built-ins.
>>>>> RequiresSort is fairly straightforward, but the interaction between
>>>>> the two requirements deserves some consideration. To make the two
>>>>> compatible, I think that RequiresSort must be interpreted as a sort
>>>>> within each partition of the clustering, but could possibly be used for a
>>>>> global sort when the two overlap.
>>>>>
>>>>> For example, if I have a table partitioned by “day” and “category”
>>>>> then the RequiredClustering would be by day, category. A required
>>>>> sort might be day ASC, category DESC, name ASC. Because that sort
>>>>> satisfies the required clustering, it could be used for a global ordering.
>>>>> But, is that useful? How would the global ordering matter beyond a sort
>>>>> within each partition, i.e., how would the partition’s place in the global
>>>>> ordering be passed?
>>>>>
>>>>> To your other questions, you might want to have a look at the recent
>>>>> SPIP I’m working on to consolidate and clean up logical plans
>>>>> <https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d>.
>>>>> That proposes more specific uses for the DataSourceV2 API that should help
>>>>> clarify what validation needs to take place. As for custom catalyst rules,
>>>>> I’d like to hear about the use cases to see if we can build it into these
>>>>> improvements.
>>>>>
>>>>> rb
>>>>> ​
>>>>>
>>>>> On Mon, Mar 26, 2018 at 8:40 AM, Patrick Woody <
>>>>> patrick.woo...@gmail.com> wrote:
>>>>>
>>>>>> Hey all,
>>>>>>
>>>>>> I saw in some of the discussions around DataSourceV2 writes that we
>>>>>> might have the data source inform Spark of requirements for the input
>>>>>> data's ordering and partitioning. Has there been a proposed API for that
>>>>>> yet?
>>>>>>
>>>>>> Even one level up it would be helpful to understand how I should be
>>>>>> thinking about the responsibility of the data source writer, when I 
>>>>>> should
>>>>>> be inserting a custom catalyst rule, and how I should handle
>>>>>> validation/assumptions of the table before attempting the write.
>>>>>>
>>>>>> Thanks!
>>>>>> Pat
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to