paul-rogers opened a new issue, #12546: URL: https://github.com/apache/druid/issues/12546
Druid is a powerful database optimized for time series at extreme scale. Druid provides features beyond those of a typical RDBMS: a flexible schema, ability to ingest from external data sources, support for long-running ingest jobs, and more. Users coming from a traditional database can be overwhelmed by the many choices. Any given application or datasource uses a subset of those features; it would be convenient for Druid, rather than the user, to remember the feature choices which the user made. For example, Druid provides many different segment granularities, yet any given datasource tends to prefer one of them on ingest. Druid allows each segment to have a distinct schema, but many datasources want to ensure that at least some minimal set of “key” columns exist. Most datasources use the same metric definitions on each ingest. And so on. Traditional RDBMS systems use a *catalog* to record the schema of tables, the structure of indexes, entity-relationships between tables and so on. In such systems, the catalog is an essential part of the system: it is the only way to interpret the layout of the binary table data, say, or to know which indexes relate to which tables. Druid is much different: each segment is self-contained: it has its own “mini-catalog.” Still, as Druid adds more SQL functionality, we believe it will be convenient for users to have an optional catalog of table (datasource) definitions to avoid the need to repeat common table properties. This is especially useful for the proposed [multi-stage ingest project](https://github.com/apache/druid/issues/12262). ## Proposal Summary Proposed is an add-on metadata catalog that allows the user to record data shape decisions in Druid and reuse them. The catalog contains: * Definitions of input tables, to avoid the awkward JSON required in proposed SQL-based multi-stage ingestion solution. * Definitions of datasources, especially for rollup, to avoid the redundant SQL and query context variables required multi-stage ingestion solution. * Definitions of views, as an alternative to the current stub view solution. Technically, the proposal envisions the following: * A “catalog” (metadata DB table) to hold “table” definitions for datasources, input sources and views. * REST API calls for catalog CRUD operations. * Integration with the Druid query planner to use catalog entries. * SQL DDL statements to work with metadata (eventually). ## Motivation With the catalog, a user can define an ingestion input source separate from a SQL INSERT statement. This is handy as the current EXTERN syntax requires that the user write out the input definition in JSON within a SQL statement. The user first defines the input table, using the REST API or (eventually) the SQL DDL statements. Then, the user references the input table as if it were a SQL table. An example of one of the `CalciteInsertDmlTest` cases using an input table definition: ```sql INSERT INTO dst SELECT * FROM "input"."inline" PARTITIONED BY ALL TIME ``` Here `input` is a schema that contains input table definition, while `inline` is a user-defined table that is an in-line CSV input source. Similarly, when using SQL to ingest into a datasource, the user can define things like segment granularity in the catalog rather than manually including it in each SQL statement. We expect to support additional use cases over time: the above should provide a sense of how the catalog can be used. ## Catalog as "Hints" Druid has gotten by this long without a catalog, so the use of the catalog is entirely optional: use it if it is convenient, specify things explicitly if that is more convenient. For this reason, the catalog can be seen as a set of hints. The "hint" idea contrasts with the traditional RDBMS (or the Hive) model in which the catalog is required. ### Input Tables Unlike query tools such as Druid, Impala or Presto, Druid never reads the same inputs twice: each read ingests a distinct set of input files. The input table definition must provide a way to parameterize the actual set of files. Perhaps the S3 bucket, or HDFS location is the same, the file layout is the same, but the specific files differ on each run. ## Components The major components of the metadata system follow along the lines of similar mechanisms within Druid: basic authentication, segment publish state, etc. There appears to be no single Druid sync framework to keep nodes synchronized with the Coordinator, so we adopt bits and pieces from each. ### Metadata DB Extension Defines a new table, perversely named "tables", that holds the metadata for a "table." A datasource is a table, but so is a view or an input source. The metadata DB extension is modeled after many others: it provides the basic CRUD semantics. It also maintains a simple version (timestamp) to catch concurrent updates. ### REST Endpoint Provides the usual CRUD operations via REST calls as operations on the Coordinator, proxied through the Router. Security in these endpoints is simple: it is based on security of the underlying object: view, datasource, etc. ### DB Synchronization Keeps Broker nodes updated to the latest state of the catalog DB. Patterned after the mechanism in the basic auth extension, but with a delta update feature borrowed from an extension that has that feature. ### Planner Integration Primary focus on this project is using catalog metadata for SQL statements, and, in particular, INSERT and REPLACE statements. Input tables replace the need for the EXERN macro; datasource metadata replaces the need to spell out partitioning and clustering. ## Rollup Datasources The main challenge is around rollup datasources. In rollup, the datasource performs aggregation. It is easy to think that ingestion does the aggregation, but consider this example: ingest a set of files, each with one row. You'll get a set of, day, dozens of single-row segments, each with the "aggregation" of a single row. The compaction mechanism then combines these segments to produce one with overall totals. This process continues if we add more segments in the same time interval and compact again. This little example points out that compaction knows how to further aggregate segments: even those with a single row. Of course, ingestion can do the same trick, if there happen to be rows with the same dimensions. But, since compaction can also do it, we know that there is sufficient state in the one-row "seed" aggregate for further compaction to occur. We want to leverage this insight. The idea is, in SQL INSERT-style ingestion, the work happens in three parts: * The SELECT statement produces a tuple of values for one "detail" row. * The INSERT statement maps those values into a single datasource row. * Later stages of ingestion (including compaction) aggregate the single-row "seed" aggregates. This means that we can convert the following example: ```sql INSERT INTO "kttm_rollup" WITH kttm_data AS ... SELECT FLOOR(TIME_PARSE("timestamp") TO MINUTE) AS __time, session, agent_category, agent_type, browser, browser_version, MV_TO_ARRAY("language") AS "language", -- Multi-value string dimension os, city, country, forwarded_for AS ip_address, COUNT(*) AS "cnt", SUM(session_length) AS session_length, APPROX_COUNT_DISTINCT_DS_HLL(event_type) AS unique_event_types FROM kttm_data WHERE os = 'iOS' GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 PARTITIONED BY HOUR CLUSTERED BY browser, session ``` To this form: ```sql INSERT INTO "kttm_rollup" SELECT TIME_PARSE("timestamp") AS __time, session, agent_category, agent_type, browser, browser_version, language, -- Multi-value string dimension os, city, country, forwarded_for AS ip_address, session_length, event_type AS unique_event_types FROM "input".kttm_data WHERE os = 'iOS' ``` Here: * The input comes from the catalog. * The GROUP BY (dimensions), PARTITION BY (segment granularity) and CLUSTERED BY (secondary partitioning) is specified in the catalog. * The SELECT produces single values for both dimensions and measures. * A Druid extension to SQL maps columns by name, not position, so the column names produced by the SELECT must match the name and type (or be convertible to the type) of a datasource column as defined in the catalog. * For the `__time` column, the metadata says the rollup grain, so that the user can omit the `TIME_FLOOR` it in the SQL: the metadata will cause the planner to insert the proper `TIME_FLOOR` function. * The `COUNT` column is not specified: it is implicitly 1 for every row: no need to have the user tell us that. Later stages use a "sum" to accumulate the counts, as today. * The multi-dimension column is implicitly converted from a scalar string to a single-element array. * A single value is also the SUM for that one row, so the SUM function can be omitted. * The `APPROX_COUNT_DISTINCT_DS_HLL` function takes a single argument, so the planner can infer to use that function to convert from a scalar to a "seed" aggregate. The column-level rules operate much line the built-in type coercion rules which SQL provides. Instead of simply converting a `INT` to `BIGINT`, Druid add rules to implicitly convert a scalar `BIGINT` to a `SUM(BIGINT)` column. ## Extensions A possible feature is to allow an external service to provide the catalog via an extension. An example of this is to use the Confluent schema registry, the Hive Metastore, etc. We'll flesh out this option a bit more as we get further along. ## Alternatives The three main alternatives are: * Do nothing: segment continue to provide any needed metadata, ingestion specs and INSERT SQL statements spell out that information when creating new segments. This puts the burden on the user. Users typically handle this by having a single ingestion "template" that they copy/paste for each ingestion. Such an approach runs a bit counter, however, to the "ease-of-use" goals for SQL, and does not follow the usual SQL pattern in which a catalog provides table descriptions (the physical model), and users use SQL to specify operations to perform on that data (logical operations.) * Use third-party catalogs such as Hive HMS, Amazon Glue or other data governance solutions. As noted above, users that have such catalogs may be able to use them via an extension (assuming we an define a good extension API and the third-party solutions can provide the Druid-specific information required.) * Built-in Druid solution: this proposal. Since the catalog will become core to Druid, we tend to favor creating one focused on the rather unique needs which Druid has. ## Development Phases This project has multiple parts. A basic plan is: * [Planner test framework](https://github.com/apache/druid/pull/12545), to allow safe revision of the planner. * Input tables for INSERT queries. Creates the basic DB, RPC and other mechanisms. * Datasource properties for non-rollup datasources. (Partitioning, clustering, etc.) * Rollup support. (This is the hard part.) ## Backward Compatibility Existing Druid installations will create the new catalog table upon upgrade. It will start empty. If a datasource has no metadata then Druid will behave exactly as it did before the upgrade. If a version with the catalog is downgraded, the old Druid version will simply ignore the catalog and the user must explicitly provide the properties formerly provided by the catalog. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
