paul-rogers opened a new issue, #13816:
URL: https://github.com/apache/druid/issues/13816

   Proposed is a system to allow MSQ `INSERT` and `REPLACE` statements against 
rollup table to be validated against a catalog schema. This is done by making 
the type of aggregate columns explicit. In addition, for tables not specified 
in the catalog, a new SQL clause `WITH ROLLUP` is added to the MSQ statements 
to make the goal of rollup explicit.
   
   ## User Experience
   
   This feature affects SQL, the catalog, and the SQL planner. Each has 
user-visible revisions.
   
   ### SQL Revision
   
   The `INSERT` and `REPLACE` statements take a new `ROLLUP` clause:
   
   ```sql
   (INSERT | REPLACE) INTO ...
   [ PARTITIONED BY <value> ]
   [ CLUSTERED BY … ]
   [ (WITH | WITHOUT) ROLLUP ]
   ```
   
   The semantics are:
   
   * `WITH ROLLUP`: write segments with rollup enabled. That is, write 
intermediate aggregate values.
   * `WITHOUT ROLLUP`: write segments without rollup enabled. That is, write 
finalized aggregate values.
   * No option: use the default value, or that specified in the catalog.
   
   ### Catalog Datasource Metadata Revision
   
   The metadata for a datasource provides a new property, `datasource_type`, 
which takes three values:
   
   * `detail`: the Datasource holds detail rows without rollup.
   * `rollup`: The datasource provides rollup per column properties
   * Unset: defaults to `detail`
   
   #### Rollup Column Types
   
   For a detail table, all columns are defined with the existing scalar types. 
For rollup tables, dimension columns take scalar types. A column is a measure 
by virtue of a new aggregate type of the form: `<AGG>(<ARG_TYPE>)`. For example:
   
   * `LATEST(VARCHAR)`
   * `LATEST(BIGINT)`
   * `LATEST(DOUBLE)`
   * `SUM(BIGINT)`
   * And so on.
   
   ### Query Validation
   
   With the above in place, we extend query validation.
   
   Setting the `datasource_type` table metadata property to `rollup` is 
equivalent to specifying `WITH ROLLUP` in the query. The user normally provides 
either the explicit query option _or_ the table metadata option. If both 
appear, they must agree.
   
   If no option appears, then for backward compatibility, the action depends on 
the `finalizeAggregates` query context option. If that context option is set 
along with either the SQL `ROLLUP` option and/or the metadata option, then all 
the set options must agree.
   
   During validation, when a table has column metadata defined, and the column 
includes a declared type, then the type inferred by the SQL planner must agree 
with the type specified in column metadata. For measures, this means that the 
query must include the correct aggregate that matches the declared aggregate 
type.
   
   ## Implementation Discussion
   
   The above behavior is selected to satisfy a number of requirements:
   
   * Validate MSQ ingest queries with rollup. This means that Calcite must 
produce an expression type which can be matched against a string form of a type 
given in catalog metadata.
   * Perform intermediate (not finalized) type propagation at plan time. This 
means that the result type of an aggregate function is an intermediate type for 
that function, not the finalized result as used in non-ingest queries.
   * Avoid aliasing. The actual intermediate type of aggregates is not unique. 
For example, the intermediate type for `MIN`, `MAX` and `SUM` of a `BIGINT` 
column is `BIGINT`. When validating, there is insufficient information to 
ensure that the same aggregate is used for the same datasource across different 
queries. Even when the intermediate type is complex, the complex type often is 
not sufficiently unique. For example, both `EARLIEST` and `LATEST` of a 
`VARCHAR` has an intermediate type of `COMPLEX<Pair<String, Long>>`.
   
   ### Plan-time Aggregate Types
   
   Since neither the finalized nor internal intermediate types satisfy the 
requirements, we invent a new aggregate type for use in the Calcite planner and 
the catalog. The aggregate type is that described above. The type includes both 
the name of the aggregate and the argument type(s). The aggregate name is 
typically the same as the SQL aggregate function name (but not the same as the 
internal aggregate class name.) The argument types use SQL types (`VARCHAR`, 
`BIGINT`, `DOUBLE`, etc.)
   
   As noted above, we cannot use the finalized types as they are insufficient 
to guarantee that the user provides the expected aggregate expression for a 
measure column. Suppose column `m` is supposed to be the sum of a `BIGINT` 
value. And suppose we used the finalized types. Then, the following would be 
valid in SQL, but not valid for ingestion:
   
   ```SQL
   INSERT INTO dst
   SELECT …, x AS m
   FROM TABLE(…) (..., x BIGINT)
   ```
   
   In the above, the scalar column `x` is of type `BIGINT` and is this 
indistinguishable from a `SUM(.)` expression.
   
   ```SQL
   INSERT INTO dst
   SELECT …, SUM(x) + 10 AS m
   FROM TABLE(…) (..., x BIGINT)
   ```
   
   The above query is fine as a normal (non-ingest) query. However, as an 
ingest query, we must store the intermediate result for `SUM(x)` and thus 
cannot add `10` to the intermediate result. Again, the finalized type is not 
sufficient to identify that there is an issue.
   
   ```SQL
   INSERT into dst
   SELECT …, MAX(x) AS m
   FROM TABLE(…) (..., x BIGINT)
   ```
   
   In this third query, `MAX(x)` has a return type of `BIGINT` which is the 
same as the finalized return type of `SUM(BIGINT)`, so the query would be 
incorrectly valid.
   
   However, by declaring that the SQL type of `SUM(BIGINT)` is a type called 
`SUM(BIGINT)`. Now, it is clear that the type of `x` in the first query above 
is `BIGINT` which is incomparable with `SUM(BIGINT)` so the query is not valid. 
In the second query, the addition is with types `SUM(BIGINT) + BIGINT` which 
has no implementation, thus the second query is also not valid. For the third 
query, the types are `MAX(BIGINT)` and `SUM(BIGINT)`, which are not compatible, 
so validation fails. The result is that we can successfully catch and reject 
queries that do not match the type declared in the table metadata.
   
   Now, let us consider a valid query:
   
   ```SQL
   INSERT into dst
   SELECT …, SUM(x) AS m
   FROM TABLE(…) (..., x BIGINT)
   ```
   
   The type of the `SUM(x)` expression is `SUM(BIGINT)` which is identical to 
the declared type. Thus, the query passes validation.
   
   #### Limitations
   
   The proposed solution does have one limitation: because of the way SQL type 
validation works, Calcite is unable to do the casting necessary to perform 
automatic type conversions. Suppose we now declare column `m` to be of type 
`SUM(DOUBLE)`. The query above would no longer pass validation. Although there 
is an implicit conversion from `BIGINT` to `DOUBLE`, Calcite has insufficient 
information to do that conversion. In SQL, type propagation flows upward, from 
`SELECT` to `INSERT`. In most engines, the `INSERT` statement inserts `CAST` 
statements to convert from the type provided by `SELECT` to the type declared 
in the target table.
   
   Given the current Druid ingest DDL implementation, we’d want the type 
information to flow down from the target table, through `INSERT` into the 
`SELECT` and finally to the `SUM(x)` expression. Since SQL doesn’t work that 
way, we can’t actually make the above happen.
   
   Nor does Druid have a way to cast a `SUM(BIGINT)` intermediate type to a 
`SUM(DOUBLE)` intermediate type at write time: the type of the aggregator just 
is the type of the output column. That is, Druid’s internal implementation has 
no concept of a target schema. Instead, the output of a native query _is_ the 
target schema. Thus, type conversions have to happen at the SQL level. 
(Actually, since the intermediate type of `SUM(BIGINT)` is just `BIGINT`, and 
the intermediate type of `SUM(DOUBLE)` is just `DOUBLE`, Druid might actually 
do the right thing in the case. A `LATEST(BIGINT)` expression feeding a 
`LATEST(DOUBLE)` column is an example where there is no conversion between the 
intermediate `COMPLEX<Pair<BIGINT, LONG>>` and `COMPLEX<Pair<DOUBLE, LONG>>` 
types.)
   
   As a result, the best we can do is to require the user to insert a `CAST` as 
in the following query:
   
   ```SQL
   INSERT into dst
   SELECT …, SUM(CAST(x AS DOUBLE)) AS m
   FROM TABLE(…) (..., x BIGINT)
   ```
   
   At some future time, we could perhaps additional Calcite rewrite rules that 
can automatically insert the `CAST`. Or, less likely, we could change Druid’s 
native query implementation to perform the query-type-to-target-type conversion 
for intermediate aggregate types.
   
   ### Impact on MSQ
   
   The proposed types appear only in the planner.
   
   MSQ determines whether to finalize or not by considering the 
`finalizeAggregates` context parameter. This parameter is not set automatically 
from either the `ROLLUP` SQL option or from catalog metadata.
   
   The reason the above is true, and the reason that MSQ handles rollup without 
this change, is that MSQ ignores the declared column type for measures. 
Instead, MSQ computes the native aggregate intermediate types from the 
aggregator specs in the native query. Prior to this change, the planner used 
finalized types during planning. Since MSQ ignored these types, MSQ used the 
proper intermediate types when the query is not finalized.
   
   After this change, MSQ continues to ignore the declared output row type, and 
continues to re-infer the type from the native query, aggregators and finalize 
options. Since, for measures, MSQ does not consider the planner’s declared 
column type, MSQ is unaffected by this proposal which changes that type on the 
Calcite side.
   
   ### SQL Aggregators Provide the SQL Aggregate Types
   
   Let us now ask, on the query side, where does the proposed aggregate type 
enter the SQL planning process? At present, every Druid aggregator has a SQL 
proxy: `SqlAggregator`, which provides a `SqlAggFunction`, which provides a 
return type inference class which provides the return type for the aggregate 
function. We provide the intermediate type simply by providing the proper 
return type inference class.
   
   ### Dual Operator Tables
   
   Druid queries are normally those that return values to the user, and thus 
use finalized aggregators. All existing `SqlAggFunction` declarations infer the 
finalized return type. We need a way, for MSQ ingest queries, to infer the 
intermediate type instead. One way to do this is to provide two operator 
tables: one with the finalized aggregate functions, another with intermediate 
aggregate functions. A query chooses one of the other depending on whether the 
target table has rollup or not. In practice, because of the timing of the 
creation of the table, we provide a per-query table with a flag that can be set 
to "finalize" or "intermediate" depending on the rollup state discovered while 
parsing the query.
   
   ### Fix Aggregator Argument Type Inference
   
   The native conversion code contains a bug for some aggregators: the code 
infers the _input_ type of an aggregator by looking at the _output_ type. That 
is, if the type of `SUM(x)` is `LONG`, we can infer that `x` is also `LONG`. 
This bug is mostly benign for finalized aggregators. It will fail, however, for 
`AVG(x)` if `x` is `LONG`. The finalized type has to be `DOUBLE` but that is 
_not_ the input type. It is not clear why this bug has not affected users.
   
   While the bug may be benign for (most) finalized aggregators, it is 
unworkable for intermediate aggregators. The output type of `SUM(x)`, where `x` 
is `BIGINT` is `SUM(BIGINT)`. But, that aggregate type is clearly not 
acceptable as the input type to the aggregator.
   
   Thus, we must fix the bug to obtain the _actual_ argument type, not just 
assume the type. 
   
   ## Implementation Plan
   
   The above is to be implemented as part of issue #12546 work. Implementation 
will occur after PR #13686 completes. That PR will enable scalar type 
validation; this proposal adds measure type validation. Testing will occur as 
part of the overall set of catalog-based unit and integration tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to