gianm opened a new issue #11929: URL: https://github.com/apache/druid/issues/11929
Everyone likes doing things with SQL, so let's make it so people can do batch ingestion using SQL INSERT! I'd like to make it possible to write an [INSERT INTO .... SELECT](https://support.microsoft.com/en-us/office/insert-into-statement-e12fd58b-33b1-479c-8221-e9f245ab8d24) query like this: ```sql INSERT INTO tbl SELECT TIME_PARSE("timestamp") AS __time, channel, cityName, countryName FROM TABLE( EXTERN( '{"type": "s3", "uris": ["s3://bucket/file"]}', '{"type": "json"}', '[{"name": "channel", "type": "string"}, {"name": "cityName", "type": "string"}, {"name": "countryName", "type": "string"}, {"name": "timestamp", "type": "string"}]' ) ) BUCKET BY FLOOR(__time TO DAY) ORDER BY channel ``` Since this work may take some time to execute, there will need to be some sort of asynchronous results API. I'm thinking a good choice would be to return an ingestion task ID immediately, so the standard Druid task APIs can be used to check its status. So the response would look like this (with object result format): ```json [{"taskId": "xyzzy"}] ``` Some thoughts about pieces of that query. ``` INSERT INTO tbl ``` In Druid there is not really a "create datasource" concept, or a datasource-wide schema. Datasources exist when they have data, and their schema is whatever data happened to get loaded. Creating a new datasource and loading more data into an existing one are the same API. So I suggest we carry those semantics over to SQL, and do ingestions (both new-table and existing-table) with the "INSERT" command. It's possible that at some point we'll want to introduce a datasource-wide schema (or partial schema), or add the ability to create empty datasources. At that point it would make sense to also add a "CREATE TABLE" command to SQL. But I suggest we start with a versatile "INSERT". --- ``` SELECT TIME_PARSE("timestamp") AS __time, channel, cityName, countryName ``` The SELECT column list would become the columns that get ingested. --- ``` FROM TABLE( EXTERN( '{"type": "s3", "uris": ["s3://bucket/file"]}', '{"type": "json"}', '[{"name": "channel", "type": "string"}, {"name": "cityName", "type": "string"}, {"name": "countryName", "type": "string"}, {"name": "timestamp", "type": "string"}]' ) ) ``` We need some way to reference external data. I suggest we start with a table function that accepts an [input source](https://druid.apache.org/docs/latest/ingestion/native-batch.html#input-sources) and [input format](https://druid.apache.org/docs/latest/ingestion/data-formats.html#input-format). This example uses an S3 input source and JSON input format. The "EXTERN" function in this example also accepts a row signature. That's because the SQL planner will need column name and type information in order to validate and plan a query. I think this is OK at first, but at some point I'd like to make it possible to discover this stuff at runtime. At some point it'd be nice to have the syntax here be more SQL-y (instead of having this embedded JSON). I think it'd be possible to do that by adding a bunch of new table functions alongside the existing input sources and formats. But I thought it'd be good to start with this generic one. --- ``` BUCKET BY FLOOR(__time TO DAY) ``` We need some way to specify segment granularity. Adding a "BUCKET BY" keyword seems like a natural way to do this. --- ``` ORDER BY channel ``` We need some way to specify how segments are partitioned, and how rows are ordered within segments. Using "ORDER BY" seems like a natural way to do this, especially for range-based partitioning. (For hash, I'm imagining something like `ORDER BY HASH(foo, bar)`.) In my experience, it's a good idea to partition and order-within-partitions using the same key, so I think it's OK to have both controlled by "ORDER BY". But if we needed to support them using different keys, I could imagine introducing a "PARTITION BY" in addition to "ORDER BY". --- ### Proposed changes Specific proposed changes: 1) Add parser and validator support for INSERT, including ability to authorize using WRITE DATASOURCE permissions. 2) Add an EXTERN table function and an "external" DataSource type that represents external data. The "external" DataSource would be used by the SQL layer to represent ingestion sources, and would be used to help generate ingestion tasks, but it would not understood by the native query execution system. 3) Structure planning such that only Scan and GroupBy are used as the native query types for INSERT. (Scan represents ingestion without rollup, GroupBy represents ingestion with rollup.) 4) Add an "orderBy" parameter to the Scan query to encapsulate the "ORDER BY" SQL clause. (This doesn't need to be executable in the native query system at first, but it'd be great to make that possible too at some point.) 5) Split QueryMaker into an interface so there can be one implementation that executes SELECT queries and one implementation that executes INSERT queries. 6) Add an INSERT-oriented QueryMaker that runs Scan and GroupBy queries as batch ingestion tasks. Virtual columns are like [transformSpec](https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html#transformspec), aggregation functions are like [metricsSpec](https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html#metricsspec), GROUP BY is like [dimensionsSpec](https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html#dimensionsspec) with rollup, BUCKET BY is like [segmentGranularity](https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html#granularityspec), etc. ### Whatabouts *What about UPDATE, DELETE, and ALTER TABLE?* Those would be cool too. I think they would be great as future work. UPDATE would be a good way to trigger reindexing jobs that modify actual row values, and ALTER TABLE would be a good way to trigger reindexing jobs that modify partitioning or column types. DELETE, if we're clever, could either trigger reindexing jobs or do some metadata-only thing depending on the parameters of the DELETE. *What about streaming?* Calcite (our SQL parser and planning engine) has a bunch of extensions that support streaming SQL: https://calcite.apache.org/docs/stream.html. I haven't studied these yet, but we may be able to use this to extend SQL to support manipulation of streaming supervisors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
