gianm commented on code in PR #12647:
URL: https://github.com/apache/druid/pull/12647#discussion_r908092220


##########
server/src/main/java/org/apache/druid/catalog/DatasourceSpec.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Strings;
+import org.apache.druid.catalog.DatasourceColumnSpec.DetailColumnSpec;
+import org.apache.druid.catalog.DatasourceColumnSpec.DimensionSpec;
+import org.apache.druid.catalog.DatasourceColumnSpec.MeasureSpec;
+import org.apache.druid.catalog.TableMetadata.TableType;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Datasource metadata exchanged via the REST API and stored
+ * in the catalog.
+ */
+public class DatasourceSpec extends TableSpec
+{
+  /**
+   * Segment grain at ingestion and initial compaction. Aging rules
+   * may override the value as segments age. If not provided here,
+   * then it must be provided at ingestion time.
+   */
+  private final String segmentGranularity;
+
+  /**
+   * Ingestion and auto-compaction rollup granularity. If null, then no
+   * rollup is enabled. Same as {@code queryGranularity} in and ingest spec,
+   * but renamed since this granularity affects rollup, not queries. Can be
+   * overridden at ingestion time. The grain may change as segments evolve:
+   * this is the grain only for ingest.
+   */
+  private final String rollupGranularity;
+
+  /**
+   * The target segment size at ingestion and initial compaction.
+   * If 0, then the system setting is used.
+   */
+  private final int targetSegmentRows;
+
+  /**
+   * Whether to enable auto-compaction. Only relevant if no auto-compaction
+   * spec is defined, since the existence of a spec overrides this setting.
+   */
+  private final boolean enableAutoCompaction;
+
+  /**
+   * The offset of segments to be auto-compacted relative to the current
+   * time. If not present, the auto-compaction default is used if
+   * auto-compaction is enabled.
+   */
+  private final String autoCompactionDelay;
+
+  private final List<DatasourceColumnSpec> columns;
+
+  public DatasourceSpec(
+      @JsonProperty("segmentGranularity") String segmentGranularity,
+      @JsonProperty("rollupGranularity") String rollupGranularity,
+      @JsonProperty("targetSegmentRows") int targetSegmentRows,
+      @JsonProperty("enableAutoCompaction") boolean enableAutoCompaction,
+      @JsonProperty("autoCompactionDelay") String autoCompactionDelay,
+      @JsonProperty("properties") Map<String, Object> properties,

Review Comment:
   When is something a property vs. a top-level, named thing? Do you have any 
properties in mind?



##########
server/src/main/java/org/apache/druid/catalog/DatasourceSpec.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Strings;
+import org.apache.druid.catalog.DatasourceColumnSpec.DetailColumnSpec;
+import org.apache.druid.catalog.DatasourceColumnSpec.DimensionSpec;
+import org.apache.druid.catalog.DatasourceColumnSpec.MeasureSpec;
+import org.apache.druid.catalog.TableMetadata.TableType;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Datasource metadata exchanged via the REST API and stored
+ * in the catalog.
+ */
+public class DatasourceSpec extends TableSpec
+{
+  /**
+   * Segment grain at ingestion and initial compaction. Aging rules
+   * may override the value as segments age. If not provided here,
+   * then it must be provided at ingestion time.
+   */
+  private final String segmentGranularity;
+
+  /**
+   * Ingestion and auto-compaction rollup granularity. If null, then no
+   * rollup is enabled. Same as {@code queryGranularity} in and ingest spec,
+   * but renamed since this granularity affects rollup, not queries. Can be
+   * overridden at ingestion time. The grain may change as segments evolve:
+   * this is the grain only for ingest.
+   */
+  private final String rollupGranularity;
+
+  /**
+   * The target segment size at ingestion and initial compaction.
+   * If 0, then the system setting is used.
+   */
+  private final int targetSegmentRows;
+
+  /**
+   * Whether to enable auto-compaction. Only relevant if no auto-compaction
+   * spec is defined, since the existence of a spec overrides this setting.
+   */
+  private final boolean enableAutoCompaction;
+
+  /**
+   * The offset of segments to be auto-compacted relative to the current
+   * time. If not present, the auto-compaction default is used if
+   * auto-compaction is enabled.
+   */
+  private final String autoCompactionDelay;
+
+  private final List<DatasourceColumnSpec> columns;
+
+  public DatasourceSpec(
+      @JsonProperty("segmentGranularity") String segmentGranularity,
+      @JsonProperty("rollupGranularity") String rollupGranularity,
+      @JsonProperty("targetSegmentRows") int targetSegmentRows,
+      @JsonProperty("enableAutoCompaction") boolean enableAutoCompaction,
+      @JsonProperty("autoCompactionDelay") String autoCompactionDelay,

Review Comment:
   Is this meant to be a replacement or alternative for (certain parts of) 
DataSourceCompactionConfig?
   
   How will we reconcile what's here with any existing 
DataSourceCompactionConfig?



##########
server/src/main/java/org/apache/druid/catalog/DatasourceColumnSpec.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+
+/**
+ * Description of a detail datasource column and a rollup
+ * dimension column.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @Type(name = "detail", value = 
DatasourceColumnSpec.DetailColumnSpec.class),
+    @Type(name = "dimension", value = 
DatasourceColumnSpec.DimensionSpec.class),
+    @Type(name = "measure", value = DatasourceColumnSpec.MeasureSpec.class),
+})
+public abstract class DatasourceColumnSpec extends ColumnSpec
+{
+  private static final String TIME_COLUMN = "__time";
+
+  @JsonCreator
+  public DatasourceColumnSpec(
+      @JsonProperty("name") String name,
+      @JsonProperty("sqlType") String sqlType
+  )
+  {
+    super(name, sqlType);
+  }
+
+  @Override
+  public void validate()
+  {
+    super.validate();
+    if (sqlType == null) {
+      return;
+    }
+    if (TIME_COLUMN.equals(name)) {
+      if (!"TIMESTAMP".equalsIgnoreCase(sqlType)) {
+        throw new IAE("__time column must have type TIMESTAMP");
+      }
+    } else if (!VALID_SQL_TYPES.containsKey(StringUtils.toUpperCase(sqlType))) 
{

Review Comment:
   What about complex types? Like, if the aggregator is a sketch aggregator, 
for example.



##########
server/src/main/java/org/apache/druid/catalog/DatasourceSpec.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Strings;
+import org.apache.druid.catalog.DatasourceColumnSpec.DetailColumnSpec;
+import org.apache.druid.catalog.DatasourceColumnSpec.DimensionSpec;
+import org.apache.druid.catalog.DatasourceColumnSpec.MeasureSpec;
+import org.apache.druid.catalog.TableMetadata.TableType;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Datasource metadata exchanged via the REST API and stored
+ * in the catalog.
+ */
+public class DatasourceSpec extends TableSpec
+{
+  /**
+   * Segment grain at ingestion and initial compaction. Aging rules
+   * may override the value as segments age. If not provided here,
+   * then it must be provided at ingestion time.
+   */
+  private final String segmentGranularity;
+
+  /**
+   * Ingestion and auto-compaction rollup granularity. If null, then no
+   * rollup is enabled. Same as {@code queryGranularity} in and ingest spec,
+   * but renamed since this granularity affects rollup, not queries. Can be
+   * overridden at ingestion time. The grain may change as segments evolve:
+   * this is the grain only for ingest.
+   */
+  private final String rollupGranularity;
+
+  /**
+   * The target segment size at ingestion and initial compaction.
+   * If 0, then the system setting is used.
+   */
+  private final int targetSegmentRows;
+
+  /**
+   * Whether to enable auto-compaction. Only relevant if no auto-compaction
+   * spec is defined, since the existence of a spec overrides this setting.
+   */
+  private final boolean enableAutoCompaction;
+
+  /**
+   * The offset of segments to be auto-compacted relative to the current
+   * time. If not present, the auto-compaction default is used if
+   * auto-compaction is enabled.
+   */
+  private final String autoCompactionDelay;
+
+  private final List<DatasourceColumnSpec> columns;
+
+  public DatasourceSpec(
+      @JsonProperty("segmentGranularity") String segmentGranularity,

Review Comment:
   Any reason this is a String, and not a Granularity?



##########
server/src/main/java/org/apache/druid/server/http/CatalogResource.java:
##########
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.google.common.base.Strings;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.druid.catalog.Actions;
+import org.apache.druid.catalog.CatalogStorage;
+import org.apache.druid.catalog.SchemaRegistry.SchemaSpec;
+import org.apache.druid.catalog.TableId;
+import org.apache.druid.catalog.TableMetadata;
+import org.apache.druid.catalog.TableSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.catalog.CatalogManager;
+import org.apache.druid.metadata.catalog.CatalogManager.DuplicateKeyException;
+import org.apache.druid.metadata.catalog.CatalogManager.NotFoundException;
+import org.apache.druid.metadata.catalog.CatalogManager.OutOfDateException;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * REST endpoint for user and internal catalog actions. Catalog actions
+ * occur at the global level (all schemas), the schema level, or the
+ * table level.
+ *
+ * @see {@link CatalogListenerResource} for the client-side API.
+ */
+@Path(CatalogResource.ROOT_PATH)
+public class CatalogResource
+{
+  public static final String ROOT_PATH = "/druid/coordinator/v1/catalog";
+
+  private final CatalogStorage catalog;
+
+  @Inject
+  public CatalogResource(CatalogStorage catalog)
+  {
+    this.catalog = catalog;
+  }
+
+  /**
+   * Create a new table within the indicated schema.
+   *
+   * @param table The table specification to create.
+   * @param ifNew Whether to skip the action if the table already exists.
+   *        This is the same as the SQL IF NOT EXISTS clause. If {@code false},
+   *        then an error is raised if the table exists. If {@code true}, then
+   *        the action silently does nothing if the table exists. Primarily for
+   *        use in scripts.
+   * @param req the HTTP request used for authorization.
+   * @return the version number of the table
+   */
+  @POST
+  @Path("/tables")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response createTable(
+      TableMetadata table,
+      @QueryParam("ifnew") boolean ifNew,
+      @Context final HttpServletRequest req)
+  {
+    String dbSchema = table.resolveDbSchema();
+    Pair<Response, SchemaSpec> result = validateSchema(dbSchema);
+    if (result.lhs != null) {
+      return result.lhs;
+    }
+    SchemaSpec schema = result.rhs;
+    if (!schema.writable()) {
+      return Actions.badRequest(
+          Actions.INVALID,
+          StringUtils.format("Cannot create tables in schema %s", dbSchema));
+    }
+    table = table.withSchema(dbSchema);
+    try {
+      table.validate();
+    }
+    catch (IAE e) {
+      return Actions.badRequest(Actions.INVALID, e.getMessage());
+    }
+    TableSpec spec = table.spec();
+    if (!schema.accepts(spec)) {
+      return Actions.badRequest(
+          Actions.INVALID,
+          StringUtils.format(
+              "Cannot create tables of type %s in schema %s",
+              spec == null ? "null" : spec.getClass().getSimpleName(),
+              dbSchema));
+    }
+    try {
+      catalog.authorizer().authorizeTable(schema, table.name(), Action.WRITE, 
req);
+    }
+    catch (ForbiddenException e) {
+      return Actions.forbidden(e);
+    }
+    try {
+      long createVersion = catalog.tables().create(table);
+      return Actions.okWithVersion(createVersion);
+    }
+    catch (DuplicateKeyException e) {
+      if (!ifNew) {
+        return Actions.badRequest(
+              Actions.DUPLICATE_ERROR,
+              StringUtils.format(
+                  "A table of name %s.%s aleady exists",
+                  table.dbSchema(),
+                  table.name()));
+      } else {
+        return Actions.okWithVersion(0);
+      }
+    }
+    catch (Exception e) {
+      return Actions.exception(e);
+    }
+  }
+
+  /**
+   * Update a table within the given schema.
+   *
+   * @param dbSchema The name of the Druid schema, which must be writable
+   *        and the user must have at least read access.
+   * @param name The name of the table definition to modify. The user must
+   *        have write access to the table.
+   * @param spec The new table definition.
+   * @param version An optional table version. If provided, the metadata DB
+   *        entry for the table must be at this exact version or the update
+   *        will fail. (Provides "optimistic locking.") If omitted (that is,
+   *        if zero), then no update conflict change is done.
+   * @param req the HTTP request used for authorization.
+   * @return the new version number of the table
+   */
+  @POST
+  @Path("/tables/{dbSchema}/{name}")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response updateTableDefn(
+      @PathParam("dbSchema") String dbSchema,
+      @PathParam("name") String name,
+      TableSpec spec,
+      @QueryParam("version") long version,

Review Comment:
   The version thing is cool. I'm a fan of this sort of thing in CRUD APIs.



##########
server/src/main/java/org/apache/druid/catalog/InputTableSpec.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Strings;
+import org.apache.druid.catalog.TableMetadata.TableType;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.java.util.common.IAE;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Definition of an external input source, primarily for ingestion.
+ * The components are derived from those for Druid ingestion: an
+ * input source, a format and a set of columns. Also provides
+ * properties, as do all table definitions.
+ */
+public class InputTableSpec extends TableSpec
+{
+  private final InputSource inputSource;
+  private final InputFormat format;
+  private final List<InputColumnSpec> columns;
+
+  public InputTableSpec(
+      @JsonProperty("inputSource") InputSource inputSource,
+      @JsonProperty("format") InputFormat format,
+      @JsonProperty("columns") List<InputColumnSpec> columns,
+      @JsonProperty("properties") Map<String, Object> properties
+  )
+  {
+    super(properties);
+    this.inputSource = inputSource;
+    this.format = format;
+    this.columns = columns;
+  }
+
+  @Override
+  public TableType type()
+  {
+    return TableType.INPUT;
+  }
+
+  @JsonProperty("inputSource")
+  public InputSource inputSource()

Review Comment:
   Will there be a way to parameterize the source somehow at runtime? I feel 
that a prime use case for input tables is going to be incremental ingestion, 
meaning the input source will change from statement to statement.



##########
server/src/main/java/org/apache/druid/server/http/CatalogResource.java:
##########
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.google.common.base.Strings;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.druid.catalog.Actions;
+import org.apache.druid.catalog.CatalogStorage;
+import org.apache.druid.catalog.SchemaRegistry.SchemaSpec;
+import org.apache.druid.catalog.TableId;
+import org.apache.druid.catalog.TableMetadata;
+import org.apache.druid.catalog.TableSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.catalog.CatalogManager;
+import org.apache.druid.metadata.catalog.CatalogManager.DuplicateKeyException;
+import org.apache.druid.metadata.catalog.CatalogManager.NotFoundException;
+import org.apache.druid.metadata.catalog.CatalogManager.OutOfDateException;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * REST endpoint for user and internal catalog actions. Catalog actions
+ * occur at the global level (all schemas), the schema level, or the
+ * table level.
+ *
+ * @see {@link CatalogListenerResource} for the client-side API.
+ */
+@Path(CatalogResource.ROOT_PATH)
+public class CatalogResource
+{
+  public static final String ROOT_PATH = "/druid/coordinator/v1/catalog";
+
+  private final CatalogStorage catalog;
+
+  @Inject
+  public CatalogResource(CatalogStorage catalog)
+  {
+    this.catalog = catalog;
+  }
+
+  /**
+   * Create a new table within the indicated schema.
+   *
+   * @param table The table specification to create.
+   * @param ifNew Whether to skip the action if the table already exists.
+   *        This is the same as the SQL IF NOT EXISTS clause. If {@code false},
+   *        then an error is raised if the table exists. If {@code true}, then
+   *        the action silently does nothing if the table exists. Primarily for
+   *        use in scripts.
+   * @param req the HTTP request used for authorization.
+   * @return the version number of the table
+   */
+  @POST
+  @Path("/tables")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response createTable(
+      TableMetadata table,
+      @QueryParam("ifnew") boolean ifNew,
+      @Context final HttpServletRequest req)
+  {
+    String dbSchema = table.resolveDbSchema();
+    Pair<Response, SchemaSpec> result = validateSchema(dbSchema);
+    if (result.lhs != null) {
+      return result.lhs;
+    }
+    SchemaSpec schema = result.rhs;
+    if (!schema.writable()) {
+      return Actions.badRequest(
+          Actions.INVALID,
+          StringUtils.format("Cannot create tables in schema %s", dbSchema));
+    }
+    table = table.withSchema(dbSchema);
+    try {
+      table.validate();
+    }
+    catch (IAE e) {
+      return Actions.badRequest(Actions.INVALID, e.getMessage());
+    }
+    TableSpec spec = table.spec();
+    if (!schema.accepts(spec)) {
+      return Actions.badRequest(
+          Actions.INVALID,
+          StringUtils.format(
+              "Cannot create tables of type %s in schema %s",
+              spec == null ? "null" : spec.getClass().getSimpleName(),
+              dbSchema));
+    }
+    try {
+      catalog.authorizer().authorizeTable(schema, table.name(), Action.WRITE, 
req);
+    }
+    catch (ForbiddenException e) {
+      return Actions.forbidden(e);
+    }
+    try {
+      long createVersion = catalog.tables().create(table);
+      return Actions.okWithVersion(createVersion);
+    }
+    catch (DuplicateKeyException e) {
+      if (!ifNew) {
+        return Actions.badRequest(
+              Actions.DUPLICATE_ERROR,
+              StringUtils.format(
+                  "A table of name %s.%s aleady exists",
+                  table.dbSchema(),
+                  table.name()));
+      } else {
+        return Actions.okWithVersion(0);

Review Comment:
   Should this return the current version of the already-existing table?



##########
server/src/main/java/org/apache/druid/catalog/DatasourceColumnSpec.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+
+/**
+ * Description of a detail datasource column and a rollup
+ * dimension column.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @Type(name = "detail", value = 
DatasourceColumnSpec.DetailColumnSpec.class),
+    @Type(name = "dimension", value = 
DatasourceColumnSpec.DimensionSpec.class),
+    @Type(name = "measure", value = DatasourceColumnSpec.MeasureSpec.class),
+})
+public abstract class DatasourceColumnSpec extends ColumnSpec
+{
+  private static final String TIME_COLUMN = "__time";
+
+  @JsonCreator
+  public DatasourceColumnSpec(
+      @JsonProperty("name") String name,
+      @JsonProperty("sqlType") String sqlType
+  )
+  {
+    super(name, sqlType);
+  }
+
+  @Override
+  public void validate()
+  {
+    super.validate();
+    if (sqlType == null) {
+      return;
+    }
+    if (TIME_COLUMN.equals(name)) {
+      if (!"TIMESTAMP".equalsIgnoreCase(sqlType)) {
+        throw new IAE("__time column must have type TIMESTAMP");
+      }
+    } else if (!VALID_SQL_TYPES.containsKey(StringUtils.toUpperCase(sqlType))) 
{
+      throw new IAE("Not a supported SQL type: " + sqlType);
+    }
+  }
+
+  public static class DetailColumnSpec extends DatasourceColumnSpec
+  {
+    @JsonCreator
+    public DetailColumnSpec(
+        @JsonProperty("name") String name,
+        @JsonProperty("sqlType") String sqlType
+    )
+    {
+      super(name, sqlType);
+    }
+
+    @Override
+    protected ColumnKind kind()
+    {
+      return ColumnKind.DETAIL;
+    }
+  }
+
+  public static class DimensionSpec extends DatasourceColumnSpec
+  {
+    @JsonCreator
+    public DimensionSpec(
+        @JsonProperty("name") String name,
+        @JsonProperty("sqlType") String sqlType
+    )
+    {
+      super(name, sqlType);
+    }
+
+    @Override
+    protected ColumnKind kind()
+    {
+      return ColumnKind.DIMENSION;
+    }
+  }
+
+  /**
+   * Catalog definition of a measure (metric) column.
+   */
+  public static class MeasureSpec extends DatasourceColumnSpec
+  {
+    private final String aggregateFn;
+
+    @JsonCreator
+    public MeasureSpec(
+        @JsonProperty("name") String name,
+        @JsonProperty("sqlType") String sqlType,
+        @JsonProperty("aggregateFn") String aggregateFn

Review Comment:
   What kind of string is meant to be in here? How are we going to interpret it?
   
   I'm especially interested in this because I've been contemplating recently 
what's the best way to write an INSERT or REPLACE into a rollup table.



##########
server/src/main/java/org/apache/druid/catalog/DatasourceColumnSpec.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+
+/**
+ * Description of a detail datasource column and a rollup
+ * dimension column.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @Type(name = "detail", value = 
DatasourceColumnSpec.DetailColumnSpec.class),

Review Comment:
   Is 'detail' a standard (de jure or de facto) term?



##########
server/src/main/java/org/apache/druid/server/http/CatalogResource.java:
##########
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.google.common.base.Strings;
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.druid.catalog.Actions;
+import org.apache.druid.catalog.CatalogStorage;
+import org.apache.druid.catalog.SchemaRegistry.SchemaSpec;
+import org.apache.druid.catalog.TableId;
+import org.apache.druid.catalog.TableMetadata;
+import org.apache.druid.catalog.TableSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.catalog.CatalogManager;
+import org.apache.druid.metadata.catalog.CatalogManager.DuplicateKeyException;
+import org.apache.druid.metadata.catalog.CatalogManager.NotFoundException;
+import org.apache.druid.metadata.catalog.CatalogManager.OutOfDateException;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.ResourceType;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * REST endpoint for user and internal catalog actions. Catalog actions
+ * occur at the global level (all schemas), the schema level, or the
+ * table level.
+ *
+ * @see {@link CatalogListenerResource} for the client-side API.
+ */
+@Path(CatalogResource.ROOT_PATH)
+public class CatalogResource
+{
+  public static final String ROOT_PATH = "/druid/coordinator/v1/catalog";
+
+  private final CatalogStorage catalog;
+
+  @Inject
+  public CatalogResource(CatalogStorage catalog)
+  {
+    this.catalog = catalog;
+  }
+
+  /**
+   * Create a new table within the indicated schema.
+   *
+   * @param table The table specification to create.
+   * @param ifNew Whether to skip the action if the table already exists.
+   *        This is the same as the SQL IF NOT EXISTS clause. If {@code false},
+   *        then an error is raised if the table exists. If {@code true}, then
+   *        the action silently does nothing if the table exists. Primarily for
+   *        use in scripts.
+   * @param req the HTTP request used for authorization.
+   * @return the version number of the table
+   */
+  @POST
+  @Path("/tables")
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response createTable(
+      TableMetadata table,
+      @QueryParam("ifnew") boolean ifNew,
+      @Context final HttpServletRequest req)
+  {
+    String dbSchema = table.resolveDbSchema();
+    Pair<Response, SchemaSpec> result = validateSchema(dbSchema);
+    if (result.lhs != null) {
+      return result.lhs;
+    }
+    SchemaSpec schema = result.rhs;
+    if (!schema.writable()) {
+      return Actions.badRequest(
+          Actions.INVALID,
+          StringUtils.format("Cannot create tables in schema %s", dbSchema));
+    }
+    table = table.withSchema(dbSchema);
+    try {
+      table.validate();
+    }
+    catch (IAE e) {
+      return Actions.badRequest(Actions.INVALID, e.getMessage());
+    }
+    TableSpec spec = table.spec();
+    if (!schema.accepts(spec)) {
+      return Actions.badRequest(
+          Actions.INVALID,
+          StringUtils.format(
+              "Cannot create tables of type %s in schema %s",
+              spec == null ? "null" : spec.getClass().getSimpleName(),
+              dbSchema));
+    }
+    try {
+      catalog.authorizer().authorizeTable(schema, table.name(), Action.WRITE, 
req);

Review Comment:
   The authorization check should come earlier (ideally, as the first thing the 
method does).



##########
server/src/main/java/org/apache/druid/catalog/DatasourceSpec.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Strings;
+import org.apache.druid.catalog.DatasourceColumnSpec.DetailColumnSpec;
+import org.apache.druid.catalog.DatasourceColumnSpec.DimensionSpec;
+import org.apache.druid.catalog.DatasourceColumnSpec.MeasureSpec;
+import org.apache.druid.catalog.TableMetadata.TableType;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Datasource metadata exchanged via the REST API and stored
+ * in the catalog.
+ */
+public class DatasourceSpec extends TableSpec
+{
+  /**
+   * Segment grain at ingestion and initial compaction. Aging rules
+   * may override the value as segments age. If not provided here,
+   * then it must be provided at ingestion time.
+   */
+  private final String segmentGranularity;
+
+  /**
+   * Ingestion and auto-compaction rollup granularity. If null, then no
+   * rollup is enabled. Same as {@code queryGranularity} in and ingest spec,
+   * but renamed since this granularity affects rollup, not queries. Can be
+   * overridden at ingestion time. The grain may change as segments evolve:
+   * this is the grain only for ingest.
+   */
+  private final String rollupGranularity;
+
+  /**
+   * The target segment size at ingestion and initial compaction.
+   * If 0, then the system setting is used.
+   */
+  private final int targetSegmentRows;
+
+  /**
+   * Whether to enable auto-compaction. Only relevant if no auto-compaction
+   * spec is defined, since the existence of a spec overrides this setting.
+   */
+  private final boolean enableAutoCompaction;
+
+  /**
+   * The offset of segments to be auto-compacted relative to the current
+   * time. If not present, the auto-compaction default is used if
+   * auto-compaction is enabled.
+   */
+  private final String autoCompactionDelay;
+
+  private final List<DatasourceColumnSpec> columns;
+
+  public DatasourceSpec(
+      @JsonProperty("segmentGranularity") String segmentGranularity,
+      @JsonProperty("rollupGranularity") String rollupGranularity,

Review Comment:
   Generally we call this `queryGranularity`. But! What do you think about 
getting rid of it? I've started to think that it makes more sense to represent 
this as an explicit `TIME_FLOOR` function call rather than a table property.



##########
server/src/main/java/org/apache/druid/catalog/Actions.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.server.security.ForbiddenException;
+
+import javax.ws.rs.core.Response;
+
+import java.util.Map;
+
+/**
+ * Helper functions for the catalog REST API actions.
+ */
+public class Actions

Review Comment:
   Are you thinking this class would be useful for other server APIs in the 
future? It seems written in such a way that it would be.



##########
server/src/main/java/org/apache/druid/catalog/InputTableSpec.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Strings;
+import org.apache.druid.catalog.TableMetadata.TableType;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.java.util.common.IAE;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Definition of an external input source, primarily for ingestion.
+ * The components are derived from those for Druid ingestion: an
+ * input source, a format and a set of columns. Also provides
+ * properties, as do all table definitions.
+ */
+public class InputTableSpec extends TableSpec
+{
+  private final InputSource inputSource;
+  private final InputFormat format;
+  private final List<InputColumnSpec> columns;
+
+  public InputTableSpec(
+      @JsonProperty("inputSource") InputSource inputSource,
+      @JsonProperty("format") InputFormat format,

Review Comment:
   I'd go with `inputSource` + `inputFormat`, for rhyming purposes, & because 
it's what indexing tasks and ExternalDataSource do.



##########
server/src/main/java/org/apache/druid/catalog/ColumnSpec.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.guice.annotations.PublicApi;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.segment.column.ColumnType;
+
+import java.util.Map;
+
+/**
+ * Base class for table columns. Columns have multiple types
+ * represented as subclasses.
+ */
+@PublicApi

Review Comment:
   IMO it'd be better to tag these as `@UnstableApi` rather than `@PublicApi`, 
since we'll likely be evolving these in the early days of the catalog as the 
use cases become more clear, and we want to signal to extension authors that 
they should plan to track our work closely if they are using these classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to