zachjsh commented on code in PR #13165: URL: https://github.com/apache/druid/pull/13165#discussion_r1004766678
########## extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/http/CatalogResource.java: ########## @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.http; + +import com.google.common.base.Strings; +import org.apache.curator.shaded.com.google.common.collect.Lists; +import org.apache.druid.catalog.model.ColumnSpec; +import org.apache.druid.catalog.model.SchemaRegistry.SchemaSpec; +import org.apache.druid.catalog.model.TableDefnRegistry; +import org.apache.druid.catalog.model.TableId; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.TableSpec; +import org.apache.druid.catalog.model.table.AbstractDatasourceDefn; +import org.apache.druid.catalog.storage.Actions; +import org.apache.druid.catalog.storage.CatalogStorage; +import org.apache.druid.catalog.storage.HideColumns; +import org.apache.druid.catalog.storage.MoveColumn; +import org.apache.druid.catalog.storage.MoveColumn.Position; +import org.apache.druid.catalog.storage.sql.CatalogManager.DuplicateKeyException; +import org.apache.druid.catalog.storage.sql.CatalogManager.NotFoundException; +import org.apache.druid.catalog.storage.sql.CatalogManager.OutOfDateException; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthorizationUtils; +import org.apache.druid.server.security.ForbiddenException; +import org.apache.druid.server.security.ResourceType; + +import javax.inject.Inject; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +/** + * REST endpoint for user and internal catalog actions. Catalog actions + * occur at the global level (all schemas), the schema level, or the + * table level. + * + * @see {@link CatalogListenerResource} for the client-side API. + */ +@Path(CatalogResource.ROOT_PATH) +public class CatalogResource +{ + public static final String ROOT_PATH = "/druid/coordinator/v1/catalog"; + + private final CatalogStorage catalog; + + @Inject + public CatalogResource(final CatalogStorage catalog) + { + this.catalog = catalog; + } + + private enum PostAction + { + NEW, + IFNEW, + REPLACE, + FORCE; + } + + /** + * Create a new table containing the given table specification. + * + * @param dbSchema The name of the Druid schema, which must be writable + * and the user must have at least read access. + * @param name The name of the table definition to modify. The user must + * have write access to the table. + * @param spec The new table definition. + * @param actionParam What to do if the table already exists. + * {@code ifNew} is the same as the SQL IF NOT EXISTS clause. If {@code new}, + * then an error is raised if the table exists. If {@code ifNew}, then + * the action silently does nothing if the table exists. Primarily for + * use in scripts. The other two options are primarily for use in tests. + * @param req the HTTP request used for authorization. + * @return the version number of the table + */ + @POST + @Path("/tables/{dbSchema}/{name}") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response postTable( + @PathParam("dbSchema") String dbSchema, + @PathParam("name") String name, + TableSpec spec, + @QueryParam("action") String actionParam, + @QueryParam("version") long version, + @Context final HttpServletRequest req + ) + { + final PostAction action; + if (actionParam == null) { + action = PostAction.NEW; + } else { + action = PostAction.valueOf(StringUtils.toUpperCase(actionParam)); + if (action == null) { + return Actions.badRequest( + Actions.INVALID, + StringUtils.format( + "Not a valid action: [%s]. Valid actions are new, ifNew, replace, force", + actionParam + ) + ); + } + } + TableId tableId = TableId.of(dbSchema, name); + Response response = authorizeTable(tableId, spec, req); + if (response != null) { + return response; + } + TableMetadata table = TableMetadata.newTable(tableId, spec); + try { + catalog.validate(table); + } + catch (IAE e) { + return Actions.badRequest(Actions.INVALID, e.getMessage()); + } + + switch (action) { + case NEW: + return insertTableSpec(table, false); + case IFNEW: + return insertTableSpec(table, true); + case REPLACE: + return updateTableSpec(table, version); + case FORCE: + return addOrUpdateTableSpec(table); + default: + throw new ISE("Unknown action."); + } + } + + private Response authorizeTable(TableId tableId, TableSpec spec, final HttpServletRequest req) + { + // Druid has a fixed set of schemas. Ensure the one provided is valid. + Pair<Response, SchemaSpec> result = validateSchema(tableId.schema()); + if (result.lhs != null) { + return result.lhs; + } + SchemaSpec schema = result.rhs; + + // The schema has to be one that allows table definitions. + if (!schema.writable()) { + return Actions.badRequest( + Actions.INVALID, + StringUtils.format("Cannot modify schema %s", tableId.schema()) + ); + } + + // Table name can't be blank or have spaces + if (Strings.isNullOrEmpty(tableId.name())) { + return Actions.badRequest(Actions.INVALID, "Table name is required"); + } + if (!tableId.name().equals(tableId.name().trim())) { + return Actions.badRequest(Actions.INVALID, "Table name cannot start or end with spaces"); + } + + // The user has to have permission to modify the table. + try { + catalog.authorizer().authorizeTable(schema, tableId.name(), Action.WRITE, req); + } + catch (ForbiddenException e) { + return Actions.forbidden(e); + } + + // Validate the spec, if provided. + if (spec != null) { + + // The given table spec has to be valid for the given schema. + if (Strings.isNullOrEmpty(spec.type())) { + return Actions.badRequest(Actions.INVALID, "Table type is required"); + } + + if (!schema.accepts(spec.type())) { + return Actions.badRequest( + Actions.INVALID, + StringUtils.format( + "Cannot create tables of type %s in schema %s", + spec.getClass().getSimpleName(), + tableId.schema() + ) + ); + } + } + + // Everything checks out, let the request proceed. + return null; + } + + private Response insertTableSpec(TableMetadata table, boolean ifNew) + { + try { + long createVersion = catalog.tables().create(table); + return Actions.okWithVersion(createVersion); + } + catch (DuplicateKeyException e) { + if (!ifNew) { + return Actions.badRequest( + Actions.DUPLICATE_ERROR, + StringUtils.format( + "A table of name %s already exists", + table.id().sqlName() + ) + ); + } else { + return Actions.okWithVersion(0); Review Comment: Seems weird to return a version of 0 here, why not return the existing version? Maybe the create method called above should have a version that takes `ifNew` and returns the current version if the table already exists? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
