Two v2 sources may return different bucket IDs for the same value, and this
breaks the phase 1 split-wise join.

This is why the FunctionCatalog included a canonicalName method (docs
<https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/BoundFunction.java#L81-L96>).
That method returns an identifier that can be used to compare whether two
bucket function instances are the same.


   1. Can we apply this idea to partitioned file source tables
   (non-bucketed) as well?

What do you mean here? The design doc discusses transforms like days(ts)
that can be supported in the future. Is that what you’re asking about? Or
are you referring to v1 file sources? I think the goal is to support v2,
since v1 doesn’t have reliable behavior.

Note that the initial implementation goal is to support bucketing since
that’s an easier case because both sides have the same number of
partitions. More complex storage-partitioned joins can be implemented later.


   1. What if the table has many partitions? Shall we apply certain join
   algorithms in the phase 1 split-wise join as well? Or even launch a Spark
   job to do so?

I think that this proposal opens up a lot of possibilities, like what
you’re suggesting here. It is a bit like AQE. We’ll need to come up with
heuristics for choosing how and when to use storage partitioning in joins.
As I said above, bucketing is a great way to get started because it fills
an existing gap. More complex use cases can be supported over time.

Ryan

On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan <cloud0...@gmail.com> wrote:

> IIUC, the general idea is to let each input split report its partition
> value, and Spark can perform the join in two phases:
> 1. join the input splits from left and right tables according to their
> partitions values and join keys, at the driver side.
> 2. for each joined input splits pair (or a group of splits), launch a
> Spark task to join the rows.
>
> My major concern is about how to define "compatible partitions". Things
> like `days(ts)` are straightforward: the same timestamp value always
> results in the same partition value, in whatever v2 sources. `bucket(col,
> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2
> sources may return different bucket IDs for the same value, and this breaks
> the phase 1 split-wise join.
>
> And two questions for further improvements:
> 1. Can we apply this idea to partitioned file source tables (non-bucketed)
> as well?
> 2. What if the table has many partitions? Shall we apply certain join
> algorithms in the phase 1 split-wise join as well? Or even launch a Spark
> job to do so?
>
> Thanks,
> Wenchen
>
> On Wed, Oct 27, 2021 at 3:08 AM Chao Sun <sunc...@apache.org> wrote:
>
>> Thanks Cheng for the comments.
>>
>> > Is migrating Hive table read path to data source v2, being a
>> prerequisite of this SPIP
>>
>> Yes, this SPIP only aims at DataSourceV2, so obviously it will help if
>> Hive eventually moves to use V2 API. With that said, I think some of the
>> ideas could be useful for V1 Hive support as well. For instance, with the
>> newly proposed logic to compare whether output partitionings from both
>> sides of a join operator are compatible, we can have HiveTableScanExec to
>> report a different partitioning other than HashPartitioning, and
>> EnsureRequirements could potentially recognize that and therefore avoid
>> shuffle if both sides report the same compatible partitioning. In addition,
>> SPARK-35703, which is part of the SPIP, is also useful in that it relaxes
>> the constraint for V1 bucket join so that the join keys do not necessarily
>> be identical to the bucket keys.
>>
>> > Would aggregate work automatically after the SPIP?
>>
>> Yes it will work as before. This case is already supported by
>> DataSourcePartitioning in V2 (see SPARK-22389).
>>
>> > Any major use cases in mind except Hive bucketed table?
>>
>> Our first use case is Apache Iceberg. In addition to that we also want to
>> add the support for Spark's built-in file data sources.
>>
>> Thanks,
>> Chao
>>
>> On Tue, Oct 26, 2021 at 10:34 AM Cheng Su <chen...@fb.com> wrote:
>>
>>> +1 for this. This is exciting movement to efficiently read bucketed
>>> table from other systems (Hive, Trino & Presto)!
>>>
>>>
>>>
>>> Still looking at the details but having some early questions:
>>>
>>>
>>>
>>>    1. Is migrating Hive table read path to data source v2, being a
>>>    prerequisite of this SPIP?
>>>
>>>
>>>
>>> Hive table read path is currently a mix of data source v1 (for Parquet &
>>> ORC file format only), and legacy Hive code path (HiveTableScanExec). In
>>> the SPIP, I am seeing we only make change for data source v2, so wondering
>>> how this would work with existing Hive table read path. In addition, just
>>> FYI, supporting writing Hive bucketed table is merged in master recently (
>>> SPARK-19256 <https://issues.apache.org/jira/browse/SPARK-19256> has
>>> details).
>>>
>>>
>>>
>>>    1. Would aggregate work automatically after the SPIP?
>>>
>>>
>>>
>>> Another major benefit for having bucketed table, is to avoid shuffle
>>> before aggregate. Just want to bring to our attention that it would be
>>> great to consider aggregate as well when doing this proposal.
>>>
>>>
>>>
>>>    1. Any major use cases in mind except Hive bucketed table?
>>>
>>>
>>>
>>> Just curious if there’s any other use cases we are targeting as part of
>>> SPIP.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Cheng Su
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From: *Ryan Blue <b...@apache.org>
>>> *Date: *Tuesday, October 26, 2021 at 9:39 AM
>>> *To: *John Zhuge <jzh...@apache.org>
>>> *Cc: *Chao Sun <sunc...@apache.org>, Wenchen Fan <cloud0...@gmail.com>,
>>> Cheng Su <chen...@fb.com>, DB Tsai <dbt...@dbtsai.com>, Dongjoon Hyun <
>>> dongjoon.h...@gmail.com>, Hyukjin Kwon <gurwls...@gmail.com>, Wenchen
>>> Fan <wenc...@databricks.com>, angers zhu <angers....@gmail.com>, dev <
>>> dev@spark.apache.org>, huaxin gao <huaxin.ga...@gmail.com>
>>> *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source
>>> V2
>>>
>>> Instead of commenting on the doc, could we keep discussion here on the
>>> dev list please? That way more people can follow it and there is more room
>>> for discussion. Comment threads have a very small area and easily become
>>> hard to follow.
>>>
>>>
>>>
>>> Ryan
>>>
>>>
>>>
>>> On Tue, Oct 26, 2021 at 9:32 AM John Zhuge <jzh...@apache.org> wrote:
>>>
>>> +1  Nicely done!
>>>
>>>
>>>
>>> On Tue, Oct 26, 2021 at 8:08 AM Chao Sun <sunc...@apache.org> wrote:
>>>
>>> Oops, sorry. I just fixed the permission setting.
>>>
>>>
>>>
>>> Thanks everyone for the positive support!
>>>
>>>
>>>
>>> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan <cloud0...@gmail.com> wrote:
>>>
>>> +1 to this SPIP and nice writeup of the design doc!
>>>
>>>
>>>
>>> Can we open comment permission in the doc so that we can discuss details
>>> there?
>>>
>>>
>>>
>>> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon <gurwls...@gmail.com>
>>> wrote:
>>>
>>> Seems making sense to me.
>>>
>>> Would be great to have some feedback from people such as @Wenchen Fan
>>> <wenc...@databricks.com> @Cheng Su <chen...@fb.com> @angers zhu
>>> <angers....@gmail.com>.
>>>
>>>
>>>
>>>
>>>
>>> On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun <dongjoon.h...@gmail.com>
>>> wrote:
>>>
>>> +1 for this SPIP.
>>>
>>>
>>>
>>> On Sun, Oct 24, 2021 at 9:59 AM huaxin gao <huaxin.ga...@gmail.com>
>>> wrote:
>>>
>>> +1. Thanks for lifting the current restrictions on bucket join and
>>> making this more generalized.
>>>
>>>
>>>
>>> On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue <b...@apache.org> wrote:
>>>
>>> +1 from me as well. Thanks Chao for doing so much to get it to this
>>> point!
>>>
>>>
>>>
>>> On Sat, Oct 23, 2021 at 11:29 PM DB Tsai <dbt...@dbtsai.com> wrote:
>>>
>>> +1 on this SPIP.
>>>
>>> This is a more generalized version of bucketed tables and bucketed
>>> joins which can eliminate very expensive data shuffles when joins, and
>>> many users in the Apache Spark community have wanted this feature for
>>> a long time!
>>>
>>> Thank you, Ryan and Chao, for working on this, and I look forward to
>>> it as a new feature in Spark 3.3
>>>
>>> DB Tsai  |  https://www.dbtsai.com/  |  PGP 42E5B25A8F7A82C1
>>>
>>> On Fri, Oct 22, 2021 at 12:18 PM Chao Sun <sunc...@apache.org> wrote:
>>> >
>>> > Hi,
>>> >
>>> > Ryan and I drafted a design doc to support a new type of join: storage
>>> partitioned join which covers bucket join support for DataSourceV2 but is
>>> more general. The goal is to let Spark leverage distribution properties
>>> reported by data sources and eliminate shuffle whenever possible.
>>> >
>>> > Design doc:
>>> https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE
>>> (includes a POC link at the end)
>>> >
>>> > We'd like to start a discussion on the doc and any feedback is welcome!
>>> >
>>> > Thanks,
>>> > Chao
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Ryan Blue
>>>
>>>
>>>
>>>
>>> --
>>>
>>> John Zhuge
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Ryan Blue
>>>
>>

-- 
Ryan Blue

Reply via email to