The link from Anton does have a good description of the Iceberg side. And I have a little more detail on the Spark side now.
Spark has _very_ limited support for bucketed joins. The only thing it can do is avoid a shuffle for data that was written out using Spark's internal hash partitioning function by gathering all files for a particular hash into a partition. If the number of buckets doesn't match the join parallelism, it doesn't work and It can't skip the sort-merge join's sort phase. So Spark is going to need quite a bit of work to support bucketed joins. I'm hoping that we can get it working in Spark 3.1. On Wed, Dec 4, 2019 at 5:15 AM Anton Okolnychyi <[email protected]> wrote: > Hi, > > Bucketed joins are on the roadmap. I think [1] gives a pretty good summary > of how that should look like. > I believe the only remaining part in Iceberg is to add the sort spec (in > progress). Then we can switch to the Spark part. > > -- Anton > > [1] - > https://github.com/apache/incubator-iceberg/issues/430#issuecomment-533360026 > > > > On 26 Nov 2019, at 21:17, suds <[email protected]> wrote: > > > > I looked at open issue and discussion around sort spec > https://github.com/apache/incubator-iceberg/issues/317 > > > > for now we have added sort spec external to iceberg and made it work by > adding additional logic to sort dataframe before writing to iceberg table ( > its a hack until above issue gets resolved) > > > > I am trying to see if I can use sorted data to some how hint join > operation that data is presorted. > > > > v1 datasource has ability to pass bucketSpec and Hive and spark bucked > table use this feature , so that join operation can use sortmerge join and > no additional sort step is needed. > > > > class HadoopFsRelation( > > location: FileIndex, > > partitionSchema: StructType, > > dataSchema: StructType, > > bucketSpec: Option[BucketSpec], > > fileFormat: FileFormat, > > options: Map[String, String])(val sparkSession: SparkSession) > > extends BaseRelation with FileRelation > > > > does anyone on this forum looked into V2 api and how similar hint can be > passed? I can work on creating proof of concept PR for sort spec but I am > not able to find support for sort spec in V2 api. > > > > I also tried to use another hack using following code which seems to > show sortMergeJoin is used but for some reason sort within partition is > taking too long ( assuming spark uses timsort I was expecting it to be > no-op) > > > > val df1 = > readIcebergTable("table1").sortWithinPartitions(col("col1")).cache() > > > > val df2 = > readIcebergTable("table2").sortWithinPartitions(col("col1")).cache() > > > > val finalDF = df1.join(df2, df1("col1") === df2("col1")) > > > > Any suggestions to make join work without additional sort? > > > > > > -- > > Thanks > > > > > > > > > > > > > > > > -- Ryan Blue Software Engineer Netflix
