gianm commented on code in PR #12647: URL: https://github.com/apache/druid/pull/12647#discussion_r908092220
########## server/src/main/java/org/apache/druid/catalog/DatasourceSpec.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; +import org.apache.druid.catalog.DatasourceColumnSpec.DetailColumnSpec; +import org.apache.druid.catalog.DatasourceColumnSpec.DimensionSpec; +import org.apache.druid.catalog.DatasourceColumnSpec.MeasureSpec; +import org.apache.druid.catalog.TableMetadata.TableType; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Datasource metadata exchanged via the REST API and stored + * in the catalog. + */ +public class DatasourceSpec extends TableSpec +{ + /** + * Segment grain at ingestion and initial compaction. Aging rules + * may override the value as segments age. If not provided here, + * then it must be provided at ingestion time. + */ + private final String segmentGranularity; + + /** + * Ingestion and auto-compaction rollup granularity. If null, then no + * rollup is enabled. Same as {@code queryGranularity} in and ingest spec, + * but renamed since this granularity affects rollup, not queries. Can be + * overridden at ingestion time. The grain may change as segments evolve: + * this is the grain only for ingest. + */ + private final String rollupGranularity; + + /** + * The target segment size at ingestion and initial compaction. + * If 0, then the system setting is used. + */ + private final int targetSegmentRows; + + /** + * Whether to enable auto-compaction. Only relevant if no auto-compaction + * spec is defined, since the existence of a spec overrides this setting. + */ + private final boolean enableAutoCompaction; + + /** + * The offset of segments to be auto-compacted relative to the current + * time. If not present, the auto-compaction default is used if + * auto-compaction is enabled. + */ + private final String autoCompactionDelay; + + private final List<DatasourceColumnSpec> columns; + + public DatasourceSpec( + @JsonProperty("segmentGranularity") String segmentGranularity, + @JsonProperty("rollupGranularity") String rollupGranularity, + @JsonProperty("targetSegmentRows") int targetSegmentRows, + @JsonProperty("enableAutoCompaction") boolean enableAutoCompaction, + @JsonProperty("autoCompactionDelay") String autoCompactionDelay, + @JsonProperty("properties") Map<String, Object> properties, Review Comment: When is something a property vs. a top-level, named thing? Do you have any properties in mind? ########## server/src/main/java/org/apache/druid/catalog/DatasourceSpec.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; +import org.apache.druid.catalog.DatasourceColumnSpec.DetailColumnSpec; +import org.apache.druid.catalog.DatasourceColumnSpec.DimensionSpec; +import org.apache.druid.catalog.DatasourceColumnSpec.MeasureSpec; +import org.apache.druid.catalog.TableMetadata.TableType; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Datasource metadata exchanged via the REST API and stored + * in the catalog. + */ +public class DatasourceSpec extends TableSpec +{ + /** + * Segment grain at ingestion and initial compaction. Aging rules + * may override the value as segments age. If not provided here, + * then it must be provided at ingestion time. + */ + private final String segmentGranularity; + + /** + * Ingestion and auto-compaction rollup granularity. If null, then no + * rollup is enabled. Same as {@code queryGranularity} in and ingest spec, + * but renamed since this granularity affects rollup, not queries. Can be + * overridden at ingestion time. The grain may change as segments evolve: + * this is the grain only for ingest. + */ + private final String rollupGranularity; + + /** + * The target segment size at ingestion and initial compaction. + * If 0, then the system setting is used. + */ + private final int targetSegmentRows; + + /** + * Whether to enable auto-compaction. Only relevant if no auto-compaction + * spec is defined, since the existence of a spec overrides this setting. + */ + private final boolean enableAutoCompaction; + + /** + * The offset of segments to be auto-compacted relative to the current + * time. If not present, the auto-compaction default is used if + * auto-compaction is enabled. + */ + private final String autoCompactionDelay; + + private final List<DatasourceColumnSpec> columns; + + public DatasourceSpec( + @JsonProperty("segmentGranularity") String segmentGranularity, + @JsonProperty("rollupGranularity") String rollupGranularity, + @JsonProperty("targetSegmentRows") int targetSegmentRows, + @JsonProperty("enableAutoCompaction") boolean enableAutoCompaction, + @JsonProperty("autoCompactionDelay") String autoCompactionDelay, Review Comment: Is this meant to be a replacement or alternative for (certain parts of) DataSourceCompactionConfig? How will we reconcile what's here with any existing DataSourceCompactionConfig? ########## server/src/main/java/org/apache/druid/catalog/DatasourceColumnSpec.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; + +/** + * Description of a detail datasource column and a rollup + * dimension column. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @Type(name = "detail", value = DatasourceColumnSpec.DetailColumnSpec.class), + @Type(name = "dimension", value = DatasourceColumnSpec.DimensionSpec.class), + @Type(name = "measure", value = DatasourceColumnSpec.MeasureSpec.class), +}) +public abstract class DatasourceColumnSpec extends ColumnSpec +{ + private static final String TIME_COLUMN = "__time"; + + @JsonCreator + public DatasourceColumnSpec( + @JsonProperty("name") String name, + @JsonProperty("sqlType") String sqlType + ) + { + super(name, sqlType); + } + + @Override + public void validate() + { + super.validate(); + if (sqlType == null) { + return; + } + if (TIME_COLUMN.equals(name)) { + if (!"TIMESTAMP".equalsIgnoreCase(sqlType)) { + throw new IAE("__time column must have type TIMESTAMP"); + } + } else if (!VALID_SQL_TYPES.containsKey(StringUtils.toUpperCase(sqlType))) { Review Comment: What about complex types? Like, if the aggregator is a sketch aggregator, for example. ########## server/src/main/java/org/apache/druid/catalog/DatasourceSpec.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; +import org.apache.druid.catalog.DatasourceColumnSpec.DetailColumnSpec; +import org.apache.druid.catalog.DatasourceColumnSpec.DimensionSpec; +import org.apache.druid.catalog.DatasourceColumnSpec.MeasureSpec; +import org.apache.druid.catalog.TableMetadata.TableType; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Datasource metadata exchanged via the REST API and stored + * in the catalog. + */ +public class DatasourceSpec extends TableSpec +{ + /** + * Segment grain at ingestion and initial compaction. Aging rules + * may override the value as segments age. If not provided here, + * then it must be provided at ingestion time. + */ + private final String segmentGranularity; + + /** + * Ingestion and auto-compaction rollup granularity. If null, then no + * rollup is enabled. Same as {@code queryGranularity} in and ingest spec, + * but renamed since this granularity affects rollup, not queries. Can be + * overridden at ingestion time. The grain may change as segments evolve: + * this is the grain only for ingest. + */ + private final String rollupGranularity; + + /** + * The target segment size at ingestion and initial compaction. + * If 0, then the system setting is used. + */ + private final int targetSegmentRows; + + /** + * Whether to enable auto-compaction. Only relevant if no auto-compaction + * spec is defined, since the existence of a spec overrides this setting. + */ + private final boolean enableAutoCompaction; + + /** + * The offset of segments to be auto-compacted relative to the current + * time. If not present, the auto-compaction default is used if + * auto-compaction is enabled. + */ + private final String autoCompactionDelay; + + private final List<DatasourceColumnSpec> columns; + + public DatasourceSpec( + @JsonProperty("segmentGranularity") String segmentGranularity, Review Comment: Any reason this is a String, and not a Granularity? ########## server/src/main/java/org/apache/druid/server/http/CatalogResource.java: ########## @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.google.common.base.Strings; +import org.apache.curator.shaded.com.google.common.collect.Lists; +import org.apache.druid.catalog.Actions; +import org.apache.druid.catalog.CatalogStorage; +import org.apache.druid.catalog.SchemaRegistry.SchemaSpec; +import org.apache.druid.catalog.TableId; +import org.apache.druid.catalog.TableMetadata; +import org.apache.druid.catalog.TableSpec; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.catalog.CatalogManager; +import org.apache.druid.metadata.catalog.CatalogManager.DuplicateKeyException; +import org.apache.druid.metadata.catalog.CatalogManager.NotFoundException; +import org.apache.druid.metadata.catalog.CatalogManager.OutOfDateException; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthorizationUtils; +import org.apache.druid.server.security.ForbiddenException; +import org.apache.druid.server.security.ResourceType; + +import javax.inject.Inject; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * REST endpoint for user and internal catalog actions. Catalog actions + * occur at the global level (all schemas), the schema level, or the + * table level. + * + * @see {@link CatalogListenerResource} for the client-side API. + */ +@Path(CatalogResource.ROOT_PATH) +public class CatalogResource +{ + public static final String ROOT_PATH = "/druid/coordinator/v1/catalog"; + + private final CatalogStorage catalog; + + @Inject + public CatalogResource(CatalogStorage catalog) + { + this.catalog = catalog; + } + + /** + * Create a new table within the indicated schema. + * + * @param table The table specification to create. + * @param ifNew Whether to skip the action if the table already exists. + * This is the same as the SQL IF NOT EXISTS clause. If {@code false}, + * then an error is raised if the table exists. If {@code true}, then + * the action silently does nothing if the table exists. Primarily for + * use in scripts. + * @param req the HTTP request used for authorization. + * @return the version number of the table + */ + @POST + @Path("/tables") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response createTable( + TableMetadata table, + @QueryParam("ifnew") boolean ifNew, + @Context final HttpServletRequest req) + { + String dbSchema = table.resolveDbSchema(); + Pair<Response, SchemaSpec> result = validateSchema(dbSchema); + if (result.lhs != null) { + return result.lhs; + } + SchemaSpec schema = result.rhs; + if (!schema.writable()) { + return Actions.badRequest( + Actions.INVALID, + StringUtils.format("Cannot create tables in schema %s", dbSchema)); + } + table = table.withSchema(dbSchema); + try { + table.validate(); + } + catch (IAE e) { + return Actions.badRequest(Actions.INVALID, e.getMessage()); + } + TableSpec spec = table.spec(); + if (!schema.accepts(spec)) { + return Actions.badRequest( + Actions.INVALID, + StringUtils.format( + "Cannot create tables of type %s in schema %s", + spec == null ? "null" : spec.getClass().getSimpleName(), + dbSchema)); + } + try { + catalog.authorizer().authorizeTable(schema, table.name(), Action.WRITE, req); + } + catch (ForbiddenException e) { + return Actions.forbidden(e); + } + try { + long createVersion = catalog.tables().create(table); + return Actions.okWithVersion(createVersion); + } + catch (DuplicateKeyException e) { + if (!ifNew) { + return Actions.badRequest( + Actions.DUPLICATE_ERROR, + StringUtils.format( + "A table of name %s.%s aleady exists", + table.dbSchema(), + table.name())); + } else { + return Actions.okWithVersion(0); + } + } + catch (Exception e) { + return Actions.exception(e); + } + } + + /** + * Update a table within the given schema. + * + * @param dbSchema The name of the Druid schema, which must be writable + * and the user must have at least read access. + * @param name The name of the table definition to modify. The user must + * have write access to the table. + * @param spec The new table definition. + * @param version An optional table version. If provided, the metadata DB + * entry for the table must be at this exact version or the update + * will fail. (Provides "optimistic locking.") If omitted (that is, + * if zero), then no update conflict change is done. + * @param req the HTTP request used for authorization. + * @return the new version number of the table + */ + @POST + @Path("/tables/{dbSchema}/{name}") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response updateTableDefn( + @PathParam("dbSchema") String dbSchema, + @PathParam("name") String name, + TableSpec spec, + @QueryParam("version") long version, Review Comment: The version thing is cool. I'm a fan of this sort of thing in CRUD APIs. ########## server/src/main/java/org/apache/druid/catalog/InputTableSpec.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; +import org.apache.druid.catalog.TableMetadata.TableType; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.java.util.common.IAE; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Definition of an external input source, primarily for ingestion. + * The components are derived from those for Druid ingestion: an + * input source, a format and a set of columns. Also provides + * properties, as do all table definitions. + */ +public class InputTableSpec extends TableSpec +{ + private final InputSource inputSource; + private final InputFormat format; + private final List<InputColumnSpec> columns; + + public InputTableSpec( + @JsonProperty("inputSource") InputSource inputSource, + @JsonProperty("format") InputFormat format, + @JsonProperty("columns") List<InputColumnSpec> columns, + @JsonProperty("properties") Map<String, Object> properties + ) + { + super(properties); + this.inputSource = inputSource; + this.format = format; + this.columns = columns; + } + + @Override + public TableType type() + { + return TableType.INPUT; + } + + @JsonProperty("inputSource") + public InputSource inputSource() Review Comment: Will there be a way to parameterize the source somehow at runtime? I feel that a prime use case for input tables is going to be incremental ingestion, meaning the input source will change from statement to statement. ########## server/src/main/java/org/apache/druid/server/http/CatalogResource.java: ########## @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.google.common.base.Strings; +import org.apache.curator.shaded.com.google.common.collect.Lists; +import org.apache.druid.catalog.Actions; +import org.apache.druid.catalog.CatalogStorage; +import org.apache.druid.catalog.SchemaRegistry.SchemaSpec; +import org.apache.druid.catalog.TableId; +import org.apache.druid.catalog.TableMetadata; +import org.apache.druid.catalog.TableSpec; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.catalog.CatalogManager; +import org.apache.druid.metadata.catalog.CatalogManager.DuplicateKeyException; +import org.apache.druid.metadata.catalog.CatalogManager.NotFoundException; +import org.apache.druid.metadata.catalog.CatalogManager.OutOfDateException; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthorizationUtils; +import org.apache.druid.server.security.ForbiddenException; +import org.apache.druid.server.security.ResourceType; + +import javax.inject.Inject; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * REST endpoint for user and internal catalog actions. Catalog actions + * occur at the global level (all schemas), the schema level, or the + * table level. + * + * @see {@link CatalogListenerResource} for the client-side API. + */ +@Path(CatalogResource.ROOT_PATH) +public class CatalogResource +{ + public static final String ROOT_PATH = "/druid/coordinator/v1/catalog"; + + private final CatalogStorage catalog; + + @Inject + public CatalogResource(CatalogStorage catalog) + { + this.catalog = catalog; + } + + /** + * Create a new table within the indicated schema. + * + * @param table The table specification to create. + * @param ifNew Whether to skip the action if the table already exists. + * This is the same as the SQL IF NOT EXISTS clause. If {@code false}, + * then an error is raised if the table exists. If {@code true}, then + * the action silently does nothing if the table exists. Primarily for + * use in scripts. + * @param req the HTTP request used for authorization. + * @return the version number of the table + */ + @POST + @Path("/tables") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response createTable( + TableMetadata table, + @QueryParam("ifnew") boolean ifNew, + @Context final HttpServletRequest req) + { + String dbSchema = table.resolveDbSchema(); + Pair<Response, SchemaSpec> result = validateSchema(dbSchema); + if (result.lhs != null) { + return result.lhs; + } + SchemaSpec schema = result.rhs; + if (!schema.writable()) { + return Actions.badRequest( + Actions.INVALID, + StringUtils.format("Cannot create tables in schema %s", dbSchema)); + } + table = table.withSchema(dbSchema); + try { + table.validate(); + } + catch (IAE e) { + return Actions.badRequest(Actions.INVALID, e.getMessage()); + } + TableSpec spec = table.spec(); + if (!schema.accepts(spec)) { + return Actions.badRequest( + Actions.INVALID, + StringUtils.format( + "Cannot create tables of type %s in schema %s", + spec == null ? "null" : spec.getClass().getSimpleName(), + dbSchema)); + } + try { + catalog.authorizer().authorizeTable(schema, table.name(), Action.WRITE, req); + } + catch (ForbiddenException e) { + return Actions.forbidden(e); + } + try { + long createVersion = catalog.tables().create(table); + return Actions.okWithVersion(createVersion); + } + catch (DuplicateKeyException e) { + if (!ifNew) { + return Actions.badRequest( + Actions.DUPLICATE_ERROR, + StringUtils.format( + "A table of name %s.%s aleady exists", + table.dbSchema(), + table.name())); + } else { + return Actions.okWithVersion(0); Review Comment: Should this return the current version of the already-existing table? ########## server/src/main/java/org/apache/druid/catalog/DatasourceColumnSpec.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; + +/** + * Description of a detail datasource column and a rollup + * dimension column. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @Type(name = "detail", value = DatasourceColumnSpec.DetailColumnSpec.class), + @Type(name = "dimension", value = DatasourceColumnSpec.DimensionSpec.class), + @Type(name = "measure", value = DatasourceColumnSpec.MeasureSpec.class), +}) +public abstract class DatasourceColumnSpec extends ColumnSpec +{ + private static final String TIME_COLUMN = "__time"; + + @JsonCreator + public DatasourceColumnSpec( + @JsonProperty("name") String name, + @JsonProperty("sqlType") String sqlType + ) + { + super(name, sqlType); + } + + @Override + public void validate() + { + super.validate(); + if (sqlType == null) { + return; + } + if (TIME_COLUMN.equals(name)) { + if (!"TIMESTAMP".equalsIgnoreCase(sqlType)) { + throw new IAE("__time column must have type TIMESTAMP"); + } + } else if (!VALID_SQL_TYPES.containsKey(StringUtils.toUpperCase(sqlType))) { + throw new IAE("Not a supported SQL type: " + sqlType); + } + } + + public static class DetailColumnSpec extends DatasourceColumnSpec + { + @JsonCreator + public DetailColumnSpec( + @JsonProperty("name") String name, + @JsonProperty("sqlType") String sqlType + ) + { + super(name, sqlType); + } + + @Override + protected ColumnKind kind() + { + return ColumnKind.DETAIL; + } + } + + public static class DimensionSpec extends DatasourceColumnSpec + { + @JsonCreator + public DimensionSpec( + @JsonProperty("name") String name, + @JsonProperty("sqlType") String sqlType + ) + { + super(name, sqlType); + } + + @Override + protected ColumnKind kind() + { + return ColumnKind.DIMENSION; + } + } + + /** + * Catalog definition of a measure (metric) column. + */ + public static class MeasureSpec extends DatasourceColumnSpec + { + private final String aggregateFn; + + @JsonCreator + public MeasureSpec( + @JsonProperty("name") String name, + @JsonProperty("sqlType") String sqlType, + @JsonProperty("aggregateFn") String aggregateFn Review Comment: What kind of string is meant to be in here? How are we going to interpret it? I'm especially interested in this because I've been contemplating recently what's the best way to write an INSERT or REPLACE into a rollup table. ########## server/src/main/java/org/apache/druid/catalog/DatasourceColumnSpec.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; + +/** + * Description of a detail datasource column and a rollup + * dimension column. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @Type(name = "detail", value = DatasourceColumnSpec.DetailColumnSpec.class), Review Comment: Is 'detail' a standard (de jure or de facto) term? ########## server/src/main/java/org/apache/druid/server/http/CatalogResource.java: ########## @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.google.common.base.Strings; +import org.apache.curator.shaded.com.google.common.collect.Lists; +import org.apache.druid.catalog.Actions; +import org.apache.druid.catalog.CatalogStorage; +import org.apache.druid.catalog.SchemaRegistry.SchemaSpec; +import org.apache.druid.catalog.TableId; +import org.apache.druid.catalog.TableMetadata; +import org.apache.druid.catalog.TableSpec; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.catalog.CatalogManager; +import org.apache.druid.metadata.catalog.CatalogManager.DuplicateKeyException; +import org.apache.druid.metadata.catalog.CatalogManager.NotFoundException; +import org.apache.druid.metadata.catalog.CatalogManager.OutOfDateException; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthorizationUtils; +import org.apache.druid.server.security.ForbiddenException; +import org.apache.druid.server.security.ResourceType; + +import javax.inject.Inject; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * REST endpoint for user and internal catalog actions. Catalog actions + * occur at the global level (all schemas), the schema level, or the + * table level. + * + * @see {@link CatalogListenerResource} for the client-side API. + */ +@Path(CatalogResource.ROOT_PATH) +public class CatalogResource +{ + public static final String ROOT_PATH = "/druid/coordinator/v1/catalog"; + + private final CatalogStorage catalog; + + @Inject + public CatalogResource(CatalogStorage catalog) + { + this.catalog = catalog; + } + + /** + * Create a new table within the indicated schema. + * + * @param table The table specification to create. + * @param ifNew Whether to skip the action if the table already exists. + * This is the same as the SQL IF NOT EXISTS clause. If {@code false}, + * then an error is raised if the table exists. If {@code true}, then + * the action silently does nothing if the table exists. Primarily for + * use in scripts. + * @param req the HTTP request used for authorization. + * @return the version number of the table + */ + @POST + @Path("/tables") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response createTable( + TableMetadata table, + @QueryParam("ifnew") boolean ifNew, + @Context final HttpServletRequest req) + { + String dbSchema = table.resolveDbSchema(); + Pair<Response, SchemaSpec> result = validateSchema(dbSchema); + if (result.lhs != null) { + return result.lhs; + } + SchemaSpec schema = result.rhs; + if (!schema.writable()) { + return Actions.badRequest( + Actions.INVALID, + StringUtils.format("Cannot create tables in schema %s", dbSchema)); + } + table = table.withSchema(dbSchema); + try { + table.validate(); + } + catch (IAE e) { + return Actions.badRequest(Actions.INVALID, e.getMessage()); + } + TableSpec spec = table.spec(); + if (!schema.accepts(spec)) { + return Actions.badRequest( + Actions.INVALID, + StringUtils.format( + "Cannot create tables of type %s in schema %s", + spec == null ? "null" : spec.getClass().getSimpleName(), + dbSchema)); + } + try { + catalog.authorizer().authorizeTable(schema, table.name(), Action.WRITE, req); Review Comment: The authorization check should come earlier (ideally, as the first thing the method does). ########## server/src/main/java/org/apache/druid/catalog/DatasourceSpec.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; +import org.apache.druid.catalog.DatasourceColumnSpec.DetailColumnSpec; +import org.apache.druid.catalog.DatasourceColumnSpec.DimensionSpec; +import org.apache.druid.catalog.DatasourceColumnSpec.MeasureSpec; +import org.apache.druid.catalog.TableMetadata.TableType; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Datasource metadata exchanged via the REST API and stored + * in the catalog. + */ +public class DatasourceSpec extends TableSpec +{ + /** + * Segment grain at ingestion and initial compaction. Aging rules + * may override the value as segments age. If not provided here, + * then it must be provided at ingestion time. + */ + private final String segmentGranularity; + + /** + * Ingestion and auto-compaction rollup granularity. If null, then no + * rollup is enabled. Same as {@code queryGranularity} in and ingest spec, + * but renamed since this granularity affects rollup, not queries. Can be + * overridden at ingestion time. The grain may change as segments evolve: + * this is the grain only for ingest. + */ + private final String rollupGranularity; + + /** + * The target segment size at ingestion and initial compaction. + * If 0, then the system setting is used. + */ + private final int targetSegmentRows; + + /** + * Whether to enable auto-compaction. Only relevant if no auto-compaction + * spec is defined, since the existence of a spec overrides this setting. + */ + private final boolean enableAutoCompaction; + + /** + * The offset of segments to be auto-compacted relative to the current + * time. If not present, the auto-compaction default is used if + * auto-compaction is enabled. + */ + private final String autoCompactionDelay; + + private final List<DatasourceColumnSpec> columns; + + public DatasourceSpec( + @JsonProperty("segmentGranularity") String segmentGranularity, + @JsonProperty("rollupGranularity") String rollupGranularity, Review Comment: Generally we call this `queryGranularity`. But! What do you think about getting rid of it? I've started to think that it makes more sense to represent this as an explicit `TIME_FLOOR` function call rather than a table property. ########## server/src/main/java/org/apache/druid/catalog/Actions.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.server.security.ForbiddenException; + +import javax.ws.rs.core.Response; + +import java.util.Map; + +/** + * Helper functions for the catalog REST API actions. + */ +public class Actions Review Comment: Are you thinking this class would be useful for other server APIs in the future? It seems written in such a way that it would be. ########## server/src/main/java/org/apache/druid/catalog/InputTableSpec.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; +import org.apache.druid.catalog.TableMetadata.TableType; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.java.util.common.IAE; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Definition of an external input source, primarily for ingestion. + * The components are derived from those for Druid ingestion: an + * input source, a format and a set of columns. Also provides + * properties, as do all table definitions. + */ +public class InputTableSpec extends TableSpec +{ + private final InputSource inputSource; + private final InputFormat format; + private final List<InputColumnSpec> columns; + + public InputTableSpec( + @JsonProperty("inputSource") InputSource inputSource, + @JsonProperty("format") InputFormat format, Review Comment: I'd go with `inputSource` + `inputFormat`, for rhyming purposes, & because it's what indexing tasks and ExternalDataSource do. ########## server/src/main/java/org/apache/druid/catalog/ColumnSpec.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.guice.annotations.PublicApi; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.column.ColumnType; + +import java.util.Map; + +/** + * Base class for table columns. Columns have multiple types + * represented as subclasses. + */ +@PublicApi Review Comment: IMO it'd be better to tag these as `@UnstableApi` rather than `@PublicApi`, since we'll likely be evolving these in the early days of the catalog as the use cases become more clear, and we want to signal to extension authors that they should plan to track our work closely if they are using these classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
