Hi Julian, Make sense. Then a new newAdded RelDistribution type requires a strong reason. I have created a JIRA [1] to track this requirement.
[1] https://issues.apache.org/jira/browse/CALCITE-4957 Best, Jing Zhang Julian Hyde <[email protected]> 于2021年12月22日周三 08:04写道: > I think you should contribute a change that adds a new value to the enum. > I know that enums are not easily extensible, but in cases like this, that > can be a feature rather than a bug. > > There are not very many distribution types, and new distribution types are > rarely invented. Requiring people to contribute to the enum is a useful > forcing function: the various groups who use Calcite are forced to describe > their use cases, and when people discover that they have the same use > cases, we tend to get reusable code. > > Converting an enum to an interface makes things a lot less concrete. It is > more difficult to reason about a piece of code, and there are bugs because > you can’t review a ’switch’ expression and say ‘yes, that covers all cases’. > > Julian > > > On Dec 21, 2021, at 12:33 AM, Jing Zhang <[email protected]> wrote: > > > > Hi community, > > I hope to extend `RelDistribution` to support more distribution types in > > order to solve data skew in the normal hash distribution. > > > > When we use hash distribution to bring all records with the same hash key > > to the same place, the job performance would be poor if there exists hot > > keys. > > There is a solution to solve this problem, we could send a hot key to one > > of serval downstream tasks, chosen at random. > > In HashJoin, we could use random hash partition in one side, for the > other > > input to the join, records relating to the hot key need to be replicated > to > > all downstream tasks handling that key. > > In HashAggregate, we could split the aggregate into partial-final if all > > the aggregation functions support splitting. > > The 10th chapter in the book "Designing Data Intensive Applications" also > > refers this solution to solve data skew. > > > > Anyway, we should first extend `RelDistribution` to support more > > distribution types, for example, hash random type. > > However, `RelDistribution.Type` is enum class which is not extensible. > > I would not add the new types in enum `RelDistribution.Type` directly. > > I prefer to do a refactor on `RelDistribution.Type` to make it extensible > > and add the new types in the subclass in the external execution engine > (e.g > > Flink). > > > > For example, there is a lookup join in Flink. is typically used to > enrich a > > table with data that is queried from an external system. > > For the following query > >> select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day > >> from default_catalog.default_database.probe as p > >> join partition_table_3 for system_time as of p.proc_time as b > >> on p.x=b.x and p.y=b.y > > > > When use normal hash distribution. > > The logical plan is as following, > > +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, > x0, > > y0, z, pt_year, pt_mon, pt_day]) > > +- Exchange(distribution=[hash[x, y]]) > > +- TableSourceScan(table=[[default_catalog, default_database, > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y]) > > > > If enable data_skew solution in hint, the logical plan is as following, > > +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, > x0, > > y0, z, pt_year, pt_mon, pt_day]) > > +- Exchange(distribution=[hash_random(key=[x, y], bucket_num=8)]) > > +- TableSourceScan(table=[[default_catalog, default_database, > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y]) > > > > What do you think? > > > > [1] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > > > Best, > > Jing Zhang > >
