paul-rogers opened a new issue, #12546:
URL: https://github.com/apache/druid/issues/12546

   Druid is a powerful database optimized for time series at extreme scale. 
Druid provides features beyond those of a typical RDBMS: a flexible schema, 
ability to ingest from external data sources, support for long-running ingest 
jobs, and more. Users coming from a traditional database can be overwhelmed by 
the many choices. Any given application or datasource uses a subset of those 
features; it would be convenient for Druid, rather than the user, to remember 
the feature choices which the user made.
   
   For example, Druid provides many different segment granularities, yet any 
given datasource tends to prefer one of them on ingest. Druid allows each 
segment to have a distinct schema, but many datasources want to ensure that at 
least some minimal set of “key” columns exist. Most datasources use the same 
metric definitions on each ingest. And so on.
   
   Traditional RDBMS systems use a *catalog* to record the schema of tables, 
the structure of indexes, entity-relationships between tables and so on. In 
such systems, the catalog is an essential part of the system: it is the only 
way to interpret the layout of the binary table data, say, or to know which 
indexes relate to which tables. Druid is much different: each segment is 
self-contained: it has its own “mini-catalog.”
   
   Still, as Druid adds more SQL functionality, we believe it will be 
convenient for users to have an optional catalog of table (datasource) 
definitions to avoid the need to repeat common table properties. This is 
especially useful for the proposed [multi-stage ingest 
project](https://github.com/apache/druid/issues/12262).
   
   ## Proposal Summary
   
   Proposed is an add-on metadata catalog that allows the user to record data 
shape decisions in Druid and reuse them. The catalog contains:
   
   * Definitions of input tables, to avoid the awkward JSON required in 
proposed SQL-based multi-stage ingestion solution.
   * Definitions of datasources, especially for rollup, to avoid the redundant 
SQL and query context variables required multi-stage ingestion solution.
   * Definitions of views, as an alternative to the current stub view solution.
   
   Technically, the proposal envisions the following:
   
   * A “catalog” (metadata DB table) to hold “table” definitions for 
datasources, input sources and views.
   * REST API calls for catalog CRUD operations.
   * Integration with the Druid query planner to use catalog entries.
   * SQL DDL statements to work with metadata (eventually).
   
   ## Motivation
   
   With the catalog, a user can define an ingestion input source separate from 
a SQL INSERT statement. This is handy as the current EXTERN syntax requires 
that the user write out the input definition in JSON within a SQL statement.
   
   The user first defines the input table, using the REST API or (eventually) 
the SQL DDL statements. Then, the user references the input table as if it were 
a SQL table. An example of one of the `CalciteInsertDmlTest` cases using an 
input table definition:
   
   ```sql
   INSERT INTO dst SELECT *
   FROM "input"."inline"
   PARTITIONED BY ALL TIME
   ```
   
   Here `input` is a schema that contains input table definition, while 
`inline` is a user-defined table that is an in-line CSV input source.
   
   Similarly, when using SQL to ingest into a datasource, the user can define 
things like segment granularity in the catalog rather than manually including 
it in each SQL statement.
   
   We expect to support additional use cases over time: the above should 
provide a sense of how the catalog can be used.
   
   ## Catalog as "Hints"
   
   Druid has gotten by this long without a catalog, so the use of the catalog 
is entirely optional: use it if it is convenient, specify things explicitly if 
that is more convenient. For this reason, the catalog can be seen as a set of 
hints. The "hint" idea contrasts with the traditional RDBMS (or the Hive) model 
in which the catalog is required.
   
   ### Input Tables
   
   Unlike query tools such as Druid, Impala or Presto, Druid never reads the 
same inputs twice: each read ingests a distinct set of input files. The input 
table definition must provide a way to parameterize the actual set of files. 
Perhaps the S3 bucket, or HDFS location is the same, the file layout is the 
same, but the specific files differ on each run.
   
   ## Components
   
   The major components of the metadata system follow along the lines of 
similar mechanisms within Druid: basic authentication, segment publish state, 
etc. There appears to be no single Druid sync framework to keep nodes 
synchronized with the Coordinator, so we adopt bits and pieces from each.
   
   ### Metadata DB Extension
   
   Defines a new table, perversely named "tables", that holds the metadata for 
a "table." A datasource is a table, but so is a view or an input source. The 
metadata DB extension is modeled after many others: it provides the basic CRUD 
semantics. It also maintains a simple version (timestamp) to catch concurrent 
updates.
   
   ### REST Endpoint
   
   Provides the usual CRUD operations via REST calls as operations on the 
Coordinator, proxied through the Router. Security in these endpoints is simple: 
it is based on security of the underlying object: view, datasource, etc.
   
   ### DB Synchronization
   
   Keeps Broker nodes updated to the latest state of the catalog DB. Patterned 
after the mechanism in the basic auth extension, but with a delta update 
feature borrowed from an extension that has that feature.
   
   ### Planner Integration
   
   Primary focus on this project is using catalog metadata for SQL statements, 
and, in particular, INSERT and REPLACE statements. Input tables replace the 
need for the EXERN macro; datasource metadata replaces the need to spell out 
partitioning and clustering. 
   
   ## Rollup Datasources
   
   The main challenge is around rollup datasources. In rollup, the datasource 
performs aggregation. It is easy to think that ingestion does the aggregation, 
but consider this example: ingest a set of files, each with one row. You'll get 
a set of, day, dozens of single-row segments, each with the "aggregation" of a 
single row. The compaction mechanism then combines these segments to produce 
one with overall totals. This process continues if we add more segments in the 
same time interval and compact again.
   
   This little example points out that compaction knows how to further 
aggregate segments: even those with a single row. Of course, ingestion can do 
the same trick, if there happen to be rows with the same dimensions. But, since 
compaction can also do it, we know that there is sufficient state in the 
one-row "seed" aggregate for further compaction to occur. We want to leverage 
this insight.
   
   The idea is, in SQL INSERT-style ingestion, the work happens in three parts:
   
   * The SELECT statement produces a tuple of values for one "detail" row.
   * The INSERT statement maps those values into a single datasource row.
   * Later stages of ingestion (including compaction) aggregate the single-row 
"seed" aggregates.
   
   This means that we can convert the following example:
   
   ```sql
   INSERT INTO "kttm_rollup"
   
   WITH kttm_data AS ...
   
   SELECT
     FLOOR(TIME_PARSE("timestamp") TO MINUTE) AS __time,
     session,
     agent_category,
     agent_type,
     browser,
     browser_version,
     MV_TO_ARRAY("language") AS "language", -- Multi-value string dimension
     os,
     city,
     country,
     forwarded_for AS ip_address,
   
     COUNT(*) AS "cnt",
     SUM(session_length) AS session_length,
     APPROX_COUNT_DISTINCT_DS_HLL(event_type) AS unique_event_types
   FROM kttm_data
   WHERE os = 'iOS'
   GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11
   PARTITIONED BY HOUR
   CLUSTERED BY browser, session
   ```
   
   To this form:
   
   ```sql
   INSERT INTO "kttm_rollup"
   SELECT
     TIME_PARSE("timestamp") AS __time,
     session,
     agent_category,
     agent_type,
     browser,
     browser_version,
     language, -- Multi-value string dimension
     os,
     city,
     country,
     forwarded_for AS ip_address,
   
     session_length,
     event_type AS unique_event_types
   FROM "input".kttm_data
   WHERE os = 'iOS'
   ```
   
   Here:
   
   * The input comes from the catalog.
   * The GROUP BY (dimensions), PARTITION BY (segment granularity) and 
CLUSTERED BY (secondary partitioning) is specified in the catalog.
   * The SELECT produces single values for both dimensions and measures.
   * A Druid extension to SQL maps columns by name, not position, so the column 
names produced by the SELECT must match the name and type (or be convertible to 
the type) of a datasource column as defined in the catalog.
   * For the `__time` column, the metadata says the rollup grain, so that the 
user can omit the `TIME_FLOOR` it in the SQL: the metadata will cause the 
planner to insert the proper `TIME_FLOOR` function.
   * The `COUNT` column is not specified: it is implicitly 1 for every row: no 
need to have the user tell us that. Later stages use a "sum" to accumulate the 
counts, as today.
   * The multi-dimension column is implicitly converted from a scalar string to 
a single-element array.
   * A single value is also the SUM for that one row, so the SUM function can 
be omitted.
   * The `APPROX_COUNT_DISTINCT_DS_HLL` function takes a single argument, so 
the planner can infer to use that function to convert from a scalar to a "seed" 
aggregate.
   
   The column-level rules operate much line the built-in type coercion rules 
which SQL provides. Instead of simply converting a `INT` to `BIGINT`, Druid add 
rules to implicitly convert a scalar `BIGINT` to a `SUM(BIGINT)` column.
   
   ## Extensions
   
   A possible feature is to allow an external service to provide the catalog 
via an extension. An example of this is to use the Confluent schema registry, 
the Hive Metastore, etc. We'll flesh out this option a bit more as we get 
further along. 
   
   ## Alternatives
   
   The three main alternatives are:
   
   * Do nothing: segment continue to provide any needed metadata, ingestion 
specs and INSERT SQL statements spell out that information when creating new 
segments. This puts the burden on the user. Users typically handle this by 
having a single ingestion "template" that they copy/paste for each ingestion. 
Such an approach runs a bit counter, however, to the "ease-of-use" goals for 
SQL, and does not follow the usual SQL pattern in which a catalog provides 
table descriptions (the physical model), and users use SQL to specify 
operations to perform on that data (logical operations.)
   * Use third-party catalogs such as Hive HMS, Amazon Glue or other data 
governance solutions. As noted above, users that have such catalogs may be able 
to use them via an extension (assuming we an define a good extension API and 
the third-party solutions can provide the Druid-specific information required.)
   * Built-in Druid solution: this proposal.
   
   Since the catalog will become core to Druid, we tend to favor creating one 
focused on the rather unique needs which Druid has.
   
   ## Development Phases
   
   This project has multiple parts. A basic plan is:
   
   * [Planner test framework](https://github.com/apache/druid/pull/12545), to 
allow safe revision of the planner.
   * Input tables for INSERT queries. Creates the basic DB, RPC and other 
mechanisms.
   * Datasource properties for non-rollup datasources. (Partitioning, 
clustering, etc.)
   * Rollup support. (This is the hard part.)
   
   ## Backward Compatibility
   
   Existing Druid installations will create the new catalog table upon upgrade. 
It will start empty. If a datasource has no metadata then Druid will behave 
exactly as it did before the upgrade.
   
   If a version with the catalog is downgraded, the old Druid version will 
simply ignore the catalog and the user must explicitly provide the properties 
formerly provided by the catalog.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to