I think you should contribute a change that adds a new value to the enum. I know that enums are not easily extensible, but in cases like this, that can be a feature rather than a bug.
There are not very many distribution types, and new distribution types are rarely invented. Requiring people to contribute to the enum is a useful forcing function: the various groups who use Calcite are forced to describe their use cases, and when people discover that they have the same use cases, we tend to get reusable code. Converting an enum to an interface makes things a lot less concrete. It is more difficult to reason about a piece of code, and there are bugs because you can’t review a ’switch’ expression and say ‘yes, that covers all cases’. Julian > On Dec 21, 2021, at 12:33 AM, Jing Zhang <[email protected]> wrote: > > Hi community, > I hope to extend `RelDistribution` to support more distribution types in > order to solve data skew in the normal hash distribution. > > When we use hash distribution to bring all records with the same hash key > to the same place, the job performance would be poor if there exists hot > keys. > There is a solution to solve this problem, we could send a hot key to one > of serval downstream tasks, chosen at random. > In HashJoin, we could use random hash partition in one side, for the other > input to the join, records relating to the hot key need to be replicated to > all downstream tasks handling that key. > In HashAggregate, we could split the aggregate into partial-final if all > the aggregation functions support splitting. > The 10th chapter in the book "Designing Data Intensive Applications" also > refers this solution to solve data skew. > > Anyway, we should first extend `RelDistribution` to support more > distribution types, for example, hash random type. > However, `RelDistribution.Type` is enum class which is not extensible. > I would not add the new types in enum `RelDistribution.Type` directly. > I prefer to do a refactor on `RelDistribution.Type` to make it extensible > and add the new types in the subclass in the external execution engine (e.g > Flink). > > For example, there is a lookup join in Flink. is typically used to enrich a > table with data that is queried from an external system. > For the following query >> select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day >> from default_catalog.default_database.probe as p >> join partition_table_3 for system_time as of p.proc_time as b >> on p.x=b.x and p.y=b.y > > When use normal hash distribution. > The logical plan is as following, > +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0, > y0, z, pt_year, pt_mon, pt_day]) > +- Exchange(distribution=[hash[x, y]]) > +- TableSourceScan(table=[[default_catalog, default_database, > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y]) > > If enable data_skew solution in hint, the logical plan is as following, > +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0, > y0, z, pt_year, pt_mon, pt_day]) > +- Exchange(distribution=[hash_random(key=[x, y], bucket_num=8)]) > +- TableSourceScan(table=[[default_catalog, default_database, > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y]) > > What do you think? > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > Best, > Jing Zhang
