Hi Jeremy,

There are several solutions here. As Jacques mentioned, the popular one is
to push the filter/project into the TableScan operator. The critical
observations are:

   1. SqlNode cannot be linked to RelNode and vice versa in the general
   case since an optimized relational plan may differ dramatically from the
   original one.
   2. Any rel tree has either TableScan or Values as leaf nodes, and
   TableScan contains table and column names. Therefore, it is possible to
   construct a valid query for the target backend traversing the RelTree
   bottom-up. You may need to generate some intermediate aliases, but the
   original column names would be there.

With this in mind, there are two common approaches to do the operator
pushdown. If the underlying system has limited pushdown capabilities, you
may flatten multiple operators into a single one. An example is a custom
TableScan operator that (1) prunes unused columns extracting referenced
fields from the upper Project operator, and (2) restricts the returned rows
via a Filter pushdown. To achieve this, you can implement a custom
TableScan operator that contains an explicit set of returned columns and a
filter. Then you may implement two rules - to push the attributes from the
parent Project into the scan and push the Filter into the scan. Finally,
you may use either VolcanoPlanner or HepPlanner to execute these rules. For
example, this is a ubiquitous approach, e.g., please see
the PushFilterIntoTableSourceScanRule [1] in Apache Flink. Sample custom
TableScan:

class CustomTableScan extends TableScan {
    List<Integer> projects; // Indexes of returned fields.
    RexNode filter;         // Optional predicate.
}

Consider the following query:
SELECT d+e
FROM t
WHERE c > 10

The initial logical plan would be:
Project[$3+$4]
  Filter[$2>10]
    TableScan[t, columns=[a,b,c,d,e]]

After the Filter pushdown, the plan would be:
Project[$3+$4]
  CustomTableScan[t, columns=[a,b,c,d,e], filter=[$2>10]]

After the project pushdown, the plan would be:
Project[$0+$1]
  CustomTableScan[t, columns=[a,b,c,d,e], filter=[$2>10], projects={$3,$4}]

Now, by looking at the contents of the CustomTableScan, you may deduce that
only columns $2, $3, and $4 are used, referring to b, c, and d,
respectively. By replacing the indexes with real column names, you may
construct a valid query/request for the target system.

This approach works well if the underlying system has limited optimization
opportunities. For example, the typical optimizations for columnar backend
are column pruning achieved through a project pushdown and
block/page/partition pruning achieved through a filter pushdown (possibly
with the help of SARGs).

However, some systems may have rich pushdown opportunities. For example, if
the target backend is a JDBC data source, sometimes you may push the whole
operator tree. In this case, the uber operator from the example above might
not work well, as there could be multiple alternative plans. For example,
if the underlying system supports Join and Aggregate pushdown, should we
push "A join B" or "B join A"? Or should we push "Aggregate(Join)" or
"Join(Aggregate)"? In this case, it might be better to convert individual
supported operators one by one with the VolcanoPlanner and the proper cost
model. Examples are Calcite's JDBC infrastructure [2] and Dremio's
integration with other backends via StoragePlugin [3]. This approach is
more powerful but is also much more involved.

>From your explanation, it seems that the Project/Filter pushdown into a
custom TableScan operator might be a good starting point.

Regards,
Vladimir.

[1]
https://github.com/apache/flink/blob/release-1.14.2/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
[2]
https://github.com/apache/calcite/blob/calcite-1.28.0/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcRel.java
[3]
https://github.com/dremio/dremio-oss/blob/5d615463e5d468c589c0b91bd20cbf70db3bc581/sabot/kernel/src/main/java/com/dremio/exec/store/StoragePlugin.java#L123

вт, 21 дек. 2021 г. в 21:25, Jeremy Dyer <[email protected]>:

> Hi Vladimir,
>
> I'm certain my design has room for improvement and would love any
> suggestions. Here is the use case.
>
> I'm working on Dask-SQL [1]. We wrap Calcite with a Python layer and use
> Calcite to parse, validate, and generate relational algebra. From the
> relational algebra generated we in turn convert those to Dask Python (and
> therefore Dataframe) API calls. Leaving out a lot of detail in a nutshell
> this is the order of what happens.
>
> 1.) Parse SQL Python str to SqlNode
> 2.) Generate RelNode from SqlNode
> 3.) Convert each RexNode into a Python Pandas/cuDF Dataframe - this is the
> step where I want to get the original SQL identifier at
>
> For step 3 there are some large performance gains that can be achieved by
> using "predicate pushdown" in the IO readers and for example only reading
> certain columns from a Parquet or ORC file. The format needed to achieve
> this is DNF and requires the original column names so those predicates can
> be passed down into the implementation libraries. The problem is those
> libraries already exist as CUDA C/C++ implementations and cannot be
> modified.
>
> Does that make sense? If there is a more intelligent way to conditional
> predicates from the SQL query, even if it isn't at the Rex level I would
> love to hear suggestions
>
> [1] - https://github.com/dask-contrib/dask-sql
>
> On Tue, Dec 21, 2021 at 1:05 PM Vladimir Ozerov <[email protected]>
> wrote:
>
> > Hi Jeremy,
> >
> > Could you please share the use case behind this requirement? In the
> general
> > case, it is not possible to link RelNode's attributes to specific
> > identifiers. For this reason, an attempt to extract such identifier from
> > any "rel" except for the RelRoot might indicate a design issue.
> >
> > Regards,
> > Vladimir.
> >
> > вт, 21 дек. 2021 г. в 20:34, Jeremy Dyer <[email protected]>:
> >
> > > Hello,
> > >
> > > Is it possible to get the original SQL identifier from an instance of
> > > RexInputRef? For example given a simple query like
> > >
> > > SELECT id FROM employees WHERE fname = 'adam'
> > >
> > > Instead of the ordinal name generated by RexInputRef ($11, for
> example).
> > I
> > > would like to find the original SQL identifier (fname, for example)
> > >
> > > Thanks,
> > > Jeremy Dyer
> > >
> >
>

Reply via email to