paul-rogers opened a new issue, #13816: URL: https://github.com/apache/druid/issues/13816
Proposed is a system to allow MSQ `INSERT` and `REPLACE` statements against rollup table to be validated against a catalog schema. This is done by making the type of aggregate columns explicit. In addition, for tables not specified in the catalog, a new SQL clause `WITH ROLLUP` is added to the MSQ statements to make the goal of rollup explicit. ## User Experience This feature affects SQL, the catalog, and the SQL planner. Each has user-visible revisions. ### SQL Revision The `INSERT` and `REPLACE` statements take a new `ROLLUP` clause: ```sql (INSERT | REPLACE) INTO ... [ PARTITIONED BY <value> ] [ CLUSTERED BY … ] [ (WITH | WITHOUT) ROLLUP ] ``` The semantics are: * `WITH ROLLUP`: write segments with rollup enabled. That is, write intermediate aggregate values. * `WITHOUT ROLLUP`: write segments without rollup enabled. That is, write finalized aggregate values. * No option: use the default value, or that specified in the catalog. ### Catalog Datasource Metadata Revision The metadata for a datasource provides a new property, `datasource_type`, which takes three values: * `detail`: the Datasource holds detail rows without rollup. * `rollup`: The datasource provides rollup per column properties * Unset: defaults to `detail` #### Rollup Column Types For a detail table, all columns are defined with the existing scalar types. For rollup tables, dimension columns take scalar types. A column is a measure by virtue of a new aggregate type of the form: `<AGG>(<ARG_TYPE>)`. For example: * `LATEST(VARCHAR)` * `LATEST(BIGINT)` * `LATEST(DOUBLE)` * `SUM(BIGINT)` * And so on. ### Query Validation With the above in place, we extend query validation. Setting the `datasource_type` table metadata property to `rollup` is equivalent to specifying `WITH ROLLUP` in the query. The user normally provides either the explicit query option _or_ the table metadata option. If both appear, they must agree. If no option appears, then for backward compatibility, the action depends on the `finalizeAggregates` query context option. If that context option is set along with either the SQL `ROLLUP` option and/or the metadata option, then all the set options must agree. During validation, when a table has column metadata defined, and the column includes a declared type, then the type inferred by the SQL planner must agree with the type specified in column metadata. For measures, this means that the query must include the correct aggregate that matches the declared aggregate type. ## Implementation Discussion The above behavior is selected to satisfy a number of requirements: * Validate MSQ ingest queries with rollup. This means that Calcite must produce an expression type which can be matched against a string form of a type given in catalog metadata. * Perform intermediate (not finalized) type propagation at plan time. This means that the result type of an aggregate function is an intermediate type for that function, not the finalized result as used in non-ingest queries. * Avoid aliasing. The actual intermediate type of aggregates is not unique. For example, the intermediate type for `MIN`, `MAX` and `SUM` of a `BIGINT` column is `BIGINT`. When validating, there is insufficient information to ensure that the same aggregate is used for the same datasource across different queries. Even when the intermediate type is complex, the complex type often is not sufficiently unique. For example, both `EARLIEST` and `LATEST` of a `VARCHAR` has an intermediate type of `COMPLEX<Pair<String, Long>>`. ### Plan-time Aggregate Types Since neither the finalized nor internal intermediate types satisfy the requirements, we invent a new aggregate type for use in the Calcite planner and the catalog. The aggregate type is that described above. The type includes both the name of the aggregate and the argument type(s). The aggregate name is typically the same as the SQL aggregate function name (but not the same as the internal aggregate class name.) The argument types use SQL types (`VARCHAR`, `BIGINT`, `DOUBLE`, etc.) As noted above, we cannot use the finalized types as they are insufficient to guarantee that the user provides the expected aggregate expression for a measure column. Suppose column `m` is supposed to be the sum of a `BIGINT` value. And suppose we used the finalized types. Then, the following would be valid in SQL, but not valid for ingestion: ```SQL INSERT INTO dst SELECT …, x AS m FROM TABLE(…) (..., x BIGINT) ``` In the above, the scalar column `x` is of type `BIGINT` and is this indistinguishable from a `SUM(.)` expression. ```SQL INSERT INTO dst SELECT …, SUM(x) + 10 AS m FROM TABLE(…) (..., x BIGINT) ``` The above query is fine as a normal (non-ingest) query. However, as an ingest query, we must store the intermediate result for `SUM(x)` and thus cannot add `10` to the intermediate result. Again, the finalized type is not sufficient to identify that there is an issue. ```SQL INSERT into dst SELECT …, MAX(x) AS m FROM TABLE(…) (..., x BIGINT) ``` In this third query, `MAX(x)` has a return type of `BIGINT` which is the same as the finalized return type of `SUM(BIGINT)`, so the query would be incorrectly valid. However, by declaring that the SQL type of `SUM(BIGINT)` is a type called `SUM(BIGINT)`. Now, it is clear that the type of `x` in the first query above is `BIGINT` which is incomparable with `SUM(BIGINT)` so the query is not valid. In the second query, the addition is with types `SUM(BIGINT) + BIGINT` which has no implementation, thus the second query is also not valid. For the third query, the types are `MAX(BIGINT)` and `SUM(BIGINT)`, which are not compatible, so validation fails. The result is that we can successfully catch and reject queries that do not match the type declared in the table metadata. Now, let us consider a valid query: ```SQL INSERT into dst SELECT …, SUM(x) AS m FROM TABLE(…) (..., x BIGINT) ``` The type of the `SUM(x)` expression is `SUM(BIGINT)` which is identical to the declared type. Thus, the query passes validation. #### Limitations The proposed solution does have one limitation: because of the way SQL type validation works, Calcite is unable to do the casting necessary to perform automatic type conversions. Suppose we now declare column `m` to be of type `SUM(DOUBLE)`. The query above would no longer pass validation. Although there is an implicit conversion from `BIGINT` to `DOUBLE`, Calcite has insufficient information to do that conversion. In SQL, type propagation flows upward, from `SELECT` to `INSERT`. In most engines, the `INSERT` statement inserts `CAST` statements to convert from the type provided by `SELECT` to the type declared in the target table. Given the current Druid ingest DDL implementation, we’d want the type information to flow down from the target table, through `INSERT` into the `SELECT` and finally to the `SUM(x)` expression. Since SQL doesn’t work that way, we can’t actually make the above happen. Nor does Druid have a way to cast a `SUM(BIGINT)` intermediate type to a `SUM(DOUBLE)` intermediate type at write time: the type of the aggregator just is the type of the output column. That is, Druid’s internal implementation has no concept of a target schema. Instead, the output of a native query _is_ the target schema. Thus, type conversions have to happen at the SQL level. (Actually, since the intermediate type of `SUM(BIGINT)` is just `BIGINT`, and the intermediate type of `SUM(DOUBLE)` is just `DOUBLE`, Druid might actually do the right thing in the case. A `LATEST(BIGINT)` expression feeding a `LATEST(DOUBLE)` column is an example where there is no conversion between the intermediate `COMPLEX<Pair<BIGINT, LONG>>` and `COMPLEX<Pair<DOUBLE, LONG>>` types.) As a result, the best we can do is to require the user to insert a `CAST` as in the following query: ```SQL INSERT into dst SELECT …, SUM(CAST(x AS DOUBLE)) AS m FROM TABLE(…) (..., x BIGINT) ``` At some future time, we could perhaps additional Calcite rewrite rules that can automatically insert the `CAST`. Or, less likely, we could change Druid’s native query implementation to perform the query-type-to-target-type conversion for intermediate aggregate types. ### Impact on MSQ The proposed types appear only in the planner. MSQ determines whether to finalize or not by considering the `finalizeAggregates` context parameter. This parameter is not set automatically from either the `ROLLUP` SQL option or from catalog metadata. The reason the above is true, and the reason that MSQ handles rollup without this change, is that MSQ ignores the declared column type for measures. Instead, MSQ computes the native aggregate intermediate types from the aggregator specs in the native query. Prior to this change, the planner used finalized types during planning. Since MSQ ignored these types, MSQ used the proper intermediate types when the query is not finalized. After this change, MSQ continues to ignore the declared output row type, and continues to re-infer the type from the native query, aggregators and finalize options. Since, for measures, MSQ does not consider the planner’s declared column type, MSQ is unaffected by this proposal which changes that type on the Calcite side. ### SQL Aggregators Provide the SQL Aggregate Types Let us now ask, on the query side, where does the proposed aggregate type enter the SQL planning process? At present, every Druid aggregator has a SQL proxy: `SqlAggregator`, which provides a `SqlAggFunction`, which provides a return type inference class which provides the return type for the aggregate function. We provide the intermediate type simply by providing the proper return type inference class. ### Dual Operator Tables Druid queries are normally those that return values to the user, and thus use finalized aggregators. All existing `SqlAggFunction` declarations infer the finalized return type. We need a way, for MSQ ingest queries, to infer the intermediate type instead. One way to do this is to provide two operator tables: one with the finalized aggregate functions, another with intermediate aggregate functions. A query chooses one of the other depending on whether the target table has rollup or not. In practice, because of the timing of the creation of the table, we provide a per-query table with a flag that can be set to "finalize" or "intermediate" depending on the rollup state discovered while parsing the query. ### Fix Aggregator Argument Type Inference The native conversion code contains a bug for some aggregators: the code infers the _input_ type of an aggregator by looking at the _output_ type. That is, if the type of `SUM(x)` is `LONG`, we can infer that `x` is also `LONG`. This bug is mostly benign for finalized aggregators. It will fail, however, for `AVG(x)` if `x` is `LONG`. The finalized type has to be `DOUBLE` but that is _not_ the input type. It is not clear why this bug has not affected users. While the bug may be benign for (most) finalized aggregators, it is unworkable for intermediate aggregators. The output type of `SUM(x)`, where `x` is `BIGINT` is `SUM(BIGINT)`. But, that aggregate type is clearly not acceptable as the input type to the aggregator. Thus, we must fix the bug to obtain the _actual_ argument type, not just assume the type. ## Implementation Plan The above is to be implemented as part of issue #12546 work. Implementation will occur after PR #13686 completes. That PR will enable scalar type validation; this proposal adds measure type validation. Testing will occur as part of the overall set of catalog-based unit and integration tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
