Hi Julian,
Make sense.
Then a new newAdded RelDistribution type requires a strong reason.
I have created a JIRA [1] to track this requirement.

[1] https://issues.apache.org/jira/browse/CALCITE-4957

Best,
Jing Zhang

Julian Hyde <[email protected]> 于2021年12月22日周三 08:04写道:

> I think you should contribute a change that adds a new value to the enum.
> I know that enums are not easily extensible, but in cases like this, that
> can be a feature rather than a bug.
>
> There are not very many distribution types, and new distribution types are
> rarely invented. Requiring people to contribute to the enum is a useful
> forcing function: the various groups who use Calcite are forced to describe
> their use cases, and when people discover that they have the same use
> cases, we tend to get reusable code.
>
> Converting an enum to an interface makes things a lot less concrete. It is
> more difficult to reason about a piece of code, and there are bugs because
> you can’t review a ’switch’ expression and say ‘yes, that covers all cases’.
>
> Julian
>
> > On Dec 21, 2021, at 12:33 AM, Jing Zhang <[email protected]> wrote:
> >
> > Hi community,
> > I hope to extend `RelDistribution` to support more distribution types in
> > order to solve data skew in the normal hash distribution.
> >
> > When we use hash distribution to bring all records with the same hash key
> > to the same place, the job performance would be poor if there exists hot
> > keys.
> > There is a solution to solve this problem, we could send a hot key to one
> > of serval downstream tasks, chosen at random.
> > In HashJoin, we could use random hash partition in one side, for the
> other
> > input to the join, records relating to the hot key need to be replicated
> to
> > all downstream tasks handling that key.
> > In HashAggregate, we could split the aggregate into partial-final if all
> > the aggregation functions support splitting.
> > The 10th chapter in the book "Designing Data Intensive Applications" also
> > refers this solution to solve data skew.
> >
> > Anyway, we should first extend `RelDistribution` to support more
> > distribution types, for example, hash random type.
> > However, `RelDistribution.Type` is enum class which is not extensible.
> > I would not add the new types in enum `RelDistribution.Type` directly.
> > I prefer to do a refactor on `RelDistribution.Type` to make it extensible
> > and add the new types in the subclass in the external execution engine
> (e.g
> > Flink).
> >
> > For example, there is a lookup join in Flink. is typically used to
> enrich a
> > table with data that is queried from an external system.
> > For the following query
> >> select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day
> >> from default_catalog.default_database.probe as p
> >> join partition_table_3 for system_time as of p.proc_time as b
> >> on p.x=b.x and p.y=b.y
> >
> > When use normal hash distribution.
> > The logical plan is as following,
> >   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y,
> x0,
> > y0, z, pt_year, pt_mon, pt_day])
> >      +- Exchange(distribution=[hash[x, y]])
> >         +- TableSourceScan(table=[[default_catalog, default_database,
> > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
> >
> > If enable data_skew solution in hint, the logical plan is as following,
> >   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y,
> x0,
> > y0, z, pt_year, pt_mon, pt_day])
> >      +- Exchange(distribution=[hash_random(key=[x, y], bucket_num=8)])
> >         +- TableSourceScan(table=[[default_catalog, default_database,
> > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
> >
> > What do you think?
> >
> > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> >
> > Best,
> > Jing Zhang
>
>

Reply via email to