We already did this in Flink SQL where there multiple optimize programs, each for specific purpose, we also have our own logical and physical conventions.
Julian Hyde <[email protected]>于2020年3月10日 周二上午4:30写道: > It's difficult to do "split processing into phases" entirely within > Calcite. Generally a DBMS would manage these phases, and call Calcite > at each step. > > I'd love to have some working code in Calcite that does this, but we'd > be stepping over the line from "framework" into "platform". It's > difficult to get contributions for these things. > > A more realistic ask is this: If anyone has implemented multi-phase > optimization with Calcite, please write a blog post or a conference > talk and share what you learned. Include a pointer to your code if > your project is open source. > > Julian > > On Fri, Mar 6, 2020 at 7:16 PM Yang Liu <[email protected]> wrote: > > > > Thanks all! > > > > @Julian is the “split processing into phases” you are referring to like > > this? > > > > with t1 as (select * from es_table where xxx limit xxx); > > select * from t2 join t1 on (t2.key = t1.key) where t2.key in (select key > > from t1) > > > > which means the SQL writer need to adapt to this specific form of SQL for > > better performance? And Calcite will cache the t1 right? > > > > Or, maybe I can implement a RelRunner or EnumerableHashJoin myself to > have > > the specific rule: the query result of right table can be used as filters > > for the left table? > > > > Thanks! > > > > > > Julian Hyde <[email protected]> 于2020年3月7日周六 上午1:48写道: > > > > > Runtime optimization is always necessary, because you just don’t have > the > > > stats until you run the query. The best DB algorithms are adaptive, and > > > therefore hard to write. The adaptations require a lot of tricky > support > > > from the runtime - e.g. propagating bloom filters against the flow of > data. > > > > > > Calcite can still help a little. > > > > > > One runtime optimization is where you split processing into phases. > Only > > > optimize the first part of your query. Build temp tables, analyze > them, and > > > use those stats to optimize the second part of your query. > > > > > > Another technique is to gather stats when as you run the query today, > so > > > that when you run it tomorrow Calcite can do a better job. > > > > > > Julian > > > > > > > > > > On Mar 6, 2020, at 5:52 AM, Danny Chan <[email protected]> wrote: > > > > > > > > Sorry to tell that Calcite runtime does not support this, the > "dynamic > > > > partition pruning" or "runtime filter" called in Impala, would build > a > > > > bloom filter for the join keys for the build side table and push it > down > > > to > > > > the probe table source, thus, in some cases, it can reduce the data. > > > > > > > > Yang Liu <[email protected]> 于2020年3月6日周五 下午6:54写道: > > > > > > > >> discussed with one of our user groups, in Spark 3.0, this is called > > > >> "dynamic > > > >> partition pruning" > > > >> > > > >> Yang Liu <[email protected]> 于2020年3月6日周五 下午6:12写道: > > > >> > > > >>> Hi, > > > >>> > > > >>> I am wondering if Calcite will support "lazy optimization" > (execution > > > >> time > > > >>> optimization / runtime optimization). > > > >>> > > > >>> For example, we want to do an inner join between an Elasticsearch > table > > > >>> and a MySQL table, like this: > > > >>> > > > >>> WITH logic_table_2 AS > > > >>> (SELECT _MAP['status'] AS "status", > > > >>> _MAP['user'] AS "user" > > > >>> FROM "es"."insight-by-sql-v3" > > > >>> LIMIT 12345) > > > >>> SELECT * > > > >>> FROM "insight_user"."user_tab" AS t1 > > > >>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user" > > > >>> WHERE t2."status" = 'fail' > > > >>> LIMIT 10 > > > >>> > > > >>> t2 is a ES table and t1 is a MySQL table, and it may generate a > > > execution > > > >>> plan like this: > > > >>> > > > >>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5], > > > >>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0], > > > >>> user=[$1]) > > > >>> EnumerableLimit(fetch=[10]) > > > >>> EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner]) > > > >>> ElasticsearchToEnumerableConverter > > > >>> ElasticsearchProject(status=[ITEM($0, 'status')], > user=[ITEM($0, > > > >>> 'user')]) > > > >>> ElasticsearchFilter(condition=[=(ITEM($0, 'status'), > 'fail')]) > > > >>> ElasticsearchSort(fetch=[12345]) > > > >>> ElasticsearchTableScan(table=[[es, > insight-by-sql-v3]]) > > > >>> JdbcToEnumerableConverter > > > >>> JdbcTableScan(table=[[insight_user, user_tab]]) > > > >>> > > > >>> since here ES query has a filter, in execution Calcite will do the > ES > > > >>> query first and get the build table, and then do JdbcTableScan and > get > > > >> the > > > >>> probe table, and do the HashJoin finally. > > > >>> > > > >>> But, since this is a INNER JOIN, there is an implicit filter on the > > > later > > > >>> JdbcTableScan: > > > >>> ``` t1.email in (select user from t2 where t2.status='fail') ```, > if > > > >>> applying this implicit filter, the dataset we will handle may > become > > > >>> extremely small (save memory) and running much faster since the > full > > > >>> JdbcTableScan is always time-wasting. But since Calcite do the > > > >> optimization > > > >>> in planner phase, this dynamic/lazy optimization seems missed ... > > > >>> > > > >>> To summarize, serial execution with a "lazy optimization" may be > faster > > > >>> and use less memory than parallel execution with an optimized > execution > > > >>> plan since the former one can reduce dataset we handle. > > > >>> > > > >>> Any ideas? > > > >>> > > > >>> > > > >>> > > > >> > > > > > > >
