We already did this in Flink SQL where there multiple optimize programs,
each for specific purpose, we also have our own logical and physical
conventions.

Julian Hyde <[email protected]>于2020年3月10日 周二上午4:30写道:

> It's difficult to do "split processing into phases" entirely within
> Calcite. Generally a DBMS would manage these phases, and call Calcite
> at each step.
>
> I'd love to have some working code in Calcite that does this, but we'd
> be stepping over the line from "framework" into "platform". It's
> difficult to get contributions for these things.
>
> A more realistic ask is this: If anyone has implemented multi-phase
> optimization with Calcite, please write a blog post or a conference
> talk and share what you learned. Include a pointer to your code if
> your project is open source.
>
> Julian
>
> On Fri, Mar 6, 2020 at 7:16 PM Yang Liu <[email protected]> wrote:
> >
> > Thanks all!
> >
> > @Julian is the “split processing into phases” you are referring to like
> > this?
> >
> > with t1 as (select * from es_table where xxx limit xxx);
> > select * from t2 join t1 on (t2.key = t1.key) where t2.key in (select key
> > from t1)
> >
> > which means the SQL writer need to adapt to this specific form of SQL for
> > better performance? And Calcite will cache the t1 right?
> >
> > Or, maybe I can implement a RelRunner or EnumerableHashJoin myself to
> have
> > the specific rule: the query result of right table can be used as filters
> > for the left table?
> >
> > Thanks!
> >
> >
> > Julian Hyde <[email protected]> 于2020年3月7日周六 上午1:48写道:
> >
> > > Runtime optimization is always necessary, because you just don’t have
> the
> > > stats until you run the query. The best DB algorithms are adaptive, and
> > > therefore hard to write. The adaptations require a lot of tricky
> support
> > > from the runtime - e.g. propagating bloom filters against the flow of
> data.
> > >
> > > Calcite can still help a little.
> > >
> > > One runtime optimization is where you split processing into phases.
> Only
> > > optimize the first part of your query. Build temp tables, analyze
> them, and
> > > use those stats to optimize the second part of your query.
> > >
> > > Another technique is to gather stats when as you run the query today,
> so
> > > that when you run it tomorrow Calcite can do a better job.
> > >
> > > Julian
> > >
> > >
> > > > On Mar 6, 2020, at 5:52 AM, Danny Chan <[email protected]> wrote:
> > > >
> > > > Sorry to tell that Calcite runtime does not support this, the
> "dynamic
> > > > partition pruning" or "runtime filter" called in Impala, would build
> a
> > > > bloom filter for the join keys for the build side table and push it
> down
> > > to
> > > > the probe table source, thus, in some cases, it can reduce the data.
> > > >
> > > > Yang Liu <[email protected]> 于2020年3月6日周五 下午6:54写道:
> > > >
> > > >> discussed with one of our user groups, in Spark 3.0, this is called
> > > >> "dynamic
> > > >> partition pruning"
> > > >>
> > > >> Yang Liu <[email protected]> 于2020年3月6日周五 下午6:12写道:
> > > >>
> > > >>> Hi,
> > > >>>
> > > >>> I am wondering if Calcite will support "lazy optimization"
> (execution
> > > >> time
> > > >>> optimization / runtime optimization).
> > > >>>
> > > >>> For example, we want to do an inner join between an Elasticsearch
> table
> > > >>> and a MySQL table, like this:
> > > >>>
> > > >>> WITH logic_table_2 AS
> > > >>>  (SELECT _MAP['status'] AS "status",
> > > >>>          _MAP['user'] AS "user"
> > > >>>   FROM "es"."insight-by-sql-v3"
> > > >>>   LIMIT 12345)
> > > >>> SELECT *
> > > >>> FROM "insight_user"."user_tab" AS t1
> > > >>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
> > > >>> WHERE t2."status" = 'fail'
> > > >>> LIMIT 10
> > > >>>
> > > >>> t2 is a ES table and t1 is a MySQL table, and it may generate a
> > > execution
> > > >>> plan like this:
> > > >>>
> > > >>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
> > > >>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
> > > >>> user=[$1])
> > > >>>  EnumerableLimit(fetch=[10])
> > > >>>    EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
> > > >>>      ElasticsearchToEnumerableConverter
> > > >>>        ElasticsearchProject(status=[ITEM($0, 'status')],
> user=[ITEM($0,
> > > >>> 'user')])
> > > >>>          ElasticsearchFilter(condition=[=(ITEM($0, 'status'),
> 'fail')])
> > > >>>            ElasticsearchSort(fetch=[12345])
> > > >>>              ElasticsearchTableScan(table=[[es,
> insight-by-sql-v3]])
> > > >>>      JdbcToEnumerableConverter
> > > >>>        JdbcTableScan(table=[[insight_user, user_tab]])
> > > >>>
> > > >>> since here ES query has a filter, in execution Calcite will do the
> ES
> > > >>> query first and get the build table, and then do JdbcTableScan and
> get
> > > >> the
> > > >>> probe table, and do the HashJoin finally.
> > > >>>
> > > >>> But, since this is a INNER JOIN, there is an implicit filter on the
> > > later
> > > >>> JdbcTableScan:
> > > >>> ``` t1.email in (select user from t2 where t2.status='fail') ```,
> if
> > > >>> applying this implicit filter, the dataset we will handle may
> become
> > > >>> extremely small (save memory) and running much faster since the
> full
> > > >>> JdbcTableScan is always time-wasting. But since Calcite do the
> > > >> optimization
> > > >>> in planner phase, this dynamic/lazy optimization seems missed ...
> > > >>>
> > > >>> To summarize, serial execution with a "lazy optimization" may be
> faster
> > > >>> and use less memory than parallel execution with an optimized
> execution
> > > >>> plan since the former one can reduce dataset we handle.
> > > >>>
> > > >>> Any ideas?
> > > >>>
> > > >>>
> > > >>>
> > > >>
> > >
> > >
>

Reply via email to