Hi community,
I hope to extend `RelDistribution` to support more distribution types in
order to solve data skew in the normal hash distribution.

When we use hash distribution to bring all records with the same hash key
to the same place, the job performance would be poor if there exists hot
keys.
There is a solution to solve this problem, we could send a hot key to one
of serval downstream tasks, chosen at random.
In HashJoin, we could use random hash partition in one side, for the other
input to the join, records relating to the hot key need to be replicated to
all downstream tasks handling that key.
In HashAggregate, we could split the aggregate into partial-final if all
the aggregation functions support splitting.
The 10th chapter in the book "Designing Data Intensive Applications" also
refers this solution to solve data skew.

Anyway, we should first extend `RelDistribution` to support more
distribution types, for example, hash random type.
However, `RelDistribution.Type` is enum class which is not extensible.
I would not add the new types in enum `RelDistribution.Type` directly.
I prefer to do a refactor on `RelDistribution.Type` to make it extensible
and add the new types in the subclass in the external execution engine (e.g
Flink).

For example, there is a lookup join in Flink. is typically used to enrich a
table with data that is queried from an external system.
For the following query
> select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day
> from default_catalog.default_database.probe as p
> join partition_table_3 for system_time as of p.proc_time as b
> on p.x=b.x and p.y=b.y

When use normal hash distribution.
The logical plan is as following,
   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0,
y0, z, pt_year, pt_mon, pt_day])
      +- Exchange(distribution=[hash[x, y]])
         +- TableSourceScan(table=[[default_catalog, default_database,
probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])

If enable data_skew solution in hint, the logical plan is as following,
   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0,
y0, z, pt_year, pt_mon, pt_day])
      +- Exchange(distribution=[hash_random(key=[x, y], bucket_num=8)])
         +- TableSourceScan(table=[[default_catalog, default_database,
probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])

What do you think?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join

Best,
Jing Zhang

Reply via email to