paul-rogers commented on code in PR #13165: URL: https://github.com/apache/druid/pull/13165#discussion_r1003847189
########## server/src/main/java/org/apache/druid/catalog/model/TableDefn.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.model; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.catalog.model.Properties.PropertyDefn; +import org.apache.druid.java.util.common.IAE; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Definition for all tables in the catalog. All tables have both + * properties and a schema. Subclasses define specific table types + * such as datasources or input tables. Some tables may be parameterized + * to allow the table to appear in a SQL table function by implementing + * the {@link Parameterized} interface. + */ +public class TableDefn extends ObjectDefn +{ + /** + * Human-readable description of the datasource. + */ + public static final String DESCRIPTION_PROPERTY = "description"; + + private final Map<String, ColumnDefn> columnDefns; + + public TableDefn( + final String name, + final String typeValue, + final List<PropertyDefn> properties, + final List<ColumnDefn> columnDefns + ) + { + super( + name, + typeValue, + CatalogUtils.concatLists( + Collections.singletonList( + new Properties.StringPropertyDefn(DESCRIPTION_PROPERTY) + ), + properties + ) + ); + this.columnDefns = columnDefns == null ? Collections.emptyMap() : toColumnMap(columnDefns); + } + + public static Map<String, ColumnDefn> toColumnMap(final List<ColumnDefn> colTypes) + { + ImmutableMap.Builder<String, ColumnDefn> builder = ImmutableMap.builder(); + for (ColumnDefn colType : colTypes) { + builder.put(colType.typeValue(), colType); + } + return builder.build(); + } + + /** + * Validate a table spec using the table, field and column definitions defined + * here. The column definitions validate the type of each property value using + * the object mapper. + */ + public void validate(ResolvedTable table) + { + validate(table.properties(), table.jsonMapper()); + if (table.spec().columns() == null) { + return; Review Comment: Yes. Such tables are perfectly legal (if not very interesting) in SQL. Allowing such a table avoids special cases when a table is first created, or as the user removes columns one-by-one. Since the catalog holds hints, it may be that the user only wants to say something about the table as a whole (the segment granularity, say), but nothing about columns. In that case, the column list (in the catalog) will be empty even though physical segments do have columns. ########## server/src/main/java/org/apache/druid/catalog/model/table/ClusterKeySpec.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.model.table; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.Objects; + +public class ClusterKeySpec +{ + private final String expr; + private final boolean desc; Review Comment: My understanding is that clustering is sorting and sorting is either ascending or descending. (The Druid implementation seems to always use nulls-first sorting.) See `SortColumn` in MSQ. In fact, maybe this should just reuse `SortColumn`, now that I know that class exists... Can you give me a hint as to what other options we might offer in the future? That way, we can figure out how to encode them. I believe @clintropolis is working on some cool new indexing ideas. If that results in more than one index choice per column (i.e. front-encoding or not), then we can add properties to columns for per-column choices, or to the table for cross-column choices. (The reason that clustering is a table property is that it includes more than one column.) ########## core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java: ########## @@ -89,7 +95,50 @@ public static void writeObjectUsingSerializerProvider( } } - private JacksonUtils() + /** + * Convert the given object to an array of bytes. Use when the object is + * known serializable so that the Jackson exception can be suppressed. + */ + public static byte[] toBytes(ObjectMapper jsonMapper, Object obj) { + try { + return jsonMapper.writeValueAsBytes(obj); + } + catch (JsonProcessingException e) { + throw new ISE("Failed to serialize " + obj.getClass().getSimpleName()); + } + } + + /** + * Deserialize an object from an array of bytes. Use when the object is + * known deserializable so that the Jackson exception can be suppressed. + */ + public static <T> T fromBytes(ObjectMapper jsonMapper, byte[] bytes, Class<T> clazz) + { + try { + return jsonMapper.readValue(bytes, clazz); + } + catch (IOException e) { + throw new ISE(e, "Failed to deserialize a " + clazz.getSimpleName()); + } + } + + /** + * Quick & easy implementation of {@code toString()} for objects which are + * primarily representations of JSON objects. Use only for cases where the + * {@code toString()} is for debugging: the cost of creating an object mapper + * every time is undesirable for production code. Also, assumes that the + * type can serialized using the default mapper: doesn't work for types that + * require custom Jackson extensions. + */ + public static String toString(Object obj) + { + ObjectMapper jsonMapper = new ObjectMapper(); Review Comment: We probably should. However, that class is not visible to the `core` package: it is in `processing` which depends on `core`. I think, for this reason, I'll move this back into the catalog package since it exploits the fact that the catalog objects are simple and don't need the Guice-based config for the `toString()` to work, but other objects in Druid do need that other mechanism. ########## server/src/main/java/org/apache/druid/catalog/model/Properties.java: ########## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.model; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.joda.time.Period; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Definition of a top-level property in a catalog object. + * Provides a set of typical property definitions. Others can be + * created case-by-case. + * <p> + * Property definitions define the property name, validate the value, + * and merge updates. Properties have a type: but the type is implicit + * via the validation, as is needed when the type is actually a map + * which represents a Java object, or when the value is a list. + */ +public interface Properties Review Comment: That's why Java has packages! But, did went ahead and renamed this interface. ########## extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/SQLCatalogManager.java: ########## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.storage.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import org.apache.druid.catalog.model.TableId; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.TableSpec; +import org.apache.druid.catalog.storage.MetastoreManager; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.Update; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Function; + +@ManageLifecycle +public class SQLCatalogManager implements CatalogManager +{ + public static final String TABLES_TABLE = "tableDefs"; + + private static final String INSERT_TABLE = + "INSERT INTO %s\n" + + " (schemaName, name, creationTime, updateTime, state, payload)\n" + + " VALUES(:schemaName, :name, :creationTime, :updateTime, :state, :payload)"; + + private static final String UPDATE_HEAD = + "UPDATE %s\n SET\n"; + + private static final String WHERE_TABLE_ID = + "WHERE schemaName = :schemaName\n" + + " AND name = :name\n"; + + private static final String SAFETY_CHECK = + " AND updateTime = :oldVersion"; + + private static final String UPDATE_DEFN_UNSAFE = + UPDATE_HEAD + + " payload = :payload,\n" + + " updateTime = :updateTime\n" + + WHERE_TABLE_ID; + + private static final String UPDATE_DEFN_SAFE = + UPDATE_DEFN_UNSAFE + + SAFETY_CHECK; + + private static final String UPDATE_STATE = + UPDATE_HEAD + + " state = :state,\n" + + " updateTime = :updateTime\n" + + WHERE_TABLE_ID; + + private static final String SELECT_TABLE = + "SELECT creationTime, updateTime, state, payload\n" + + "FROM %s\n" + + WHERE_TABLE_ID; + + private static final String SELECT_PAYLOAD = + "SELECT state, payload\n" + + "FROM %s\n" + + WHERE_TABLE_ID; + + private static final String SELECT_ALL_TABLES = + "SELECT schemaName, name\n" + + "FROM %s\n" + + "ORDER BY schemaName, name"; + + private static final String SELECT_TABLES_IN_SCHEMA = + "SELECT name\n" + + "FROM %s\n" + + "WHERE schemaName = :schemaName\n" + + "ORDER BY name"; + + private static final String SELECT_TABLE_DETAILS_IN_SCHEMA = + "SELECT name, creationTime, updateTime, state, payload\n" + + "FROM %s\n" + + "WHERE schemaName = :schemaName\n" + + "ORDER BY name"; + + private static final String DELETE_TABLE = + "DELETE FROM %s\n" + + WHERE_TABLE_ID; + + private final MetastoreManager metastoreManager; + private final SQLMetadataConnector connector; + private final ObjectMapper jsonMapper; + private final IDBI dbi; + private final String tableName; + private final Deque<Listener> listeners = new ConcurrentLinkedDeque<>(); + + @Inject + public SQLCatalogManager(MetastoreManager metastoreManager) + { + if (!metastoreManager.isSql()) { + throw new ISE("SQLCatalogManager only works with SQL based metadata store at this time"); + } + this.metastoreManager = metastoreManager; + this.connector = metastoreManager.sqlConnector(); + this.dbi = connector.getDBI(); + this.jsonMapper = metastoreManager.jsonMapper(); + this.tableName = getTableDefnTable(); + } + + @Override + @LifecycleStart + public void start() + { + createTableDefnTable(); + } + + @Override + public void stop() + { + } + + // Mimics what MetadataStorageTablesConfig should do. + public String getTableDefnTable() + { + final String base = metastoreManager.tablesConfig().getBase(); + if (Strings.isNullOrEmpty(base)) { + return TABLES_TABLE; + } else { + return StringUtils.format("%s_%s", base, TABLES_TABLE); + } + } + + // TODO: Move to SqlMetadataConnector + @Override + public void createTableDefnTable() + { + if (!metastoreManager.createTables()) { + return; + } + connector.createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %s (\n" Review Comment: On the first suggestion: I was dutifully copying the format used in `SQLMetadataConnector` in anticipation that this code may move into that other class if/when the catalog becomes part of the core Druid. As for the SQL layer: we need to do something. The current code works, but seems a decade behind best practices. We mostly assume SQL, but not really. The JDBI stuff is clever, bit old-school. Heck, we can't even evolve schemas, so the catalog schema we release here has to stay fixed for all time. That's a key reason I'm hedging my bets on some of the DB stuff. To be honest, on this DB integration layer, I mostly disagree with the approach we use, but did reluctantly follow Druid patterns to make reviews easier. ########## extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/SQLCatalogManager.java: ########## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.storage.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import org.apache.druid.catalog.model.TableId; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.TableSpec; +import org.apache.druid.catalog.storage.MetastoreManager; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.Update; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Function; + +@ManageLifecycle +public class SQLCatalogManager implements CatalogManager +{ + public static final String TABLES_TABLE = "tableDefs"; + + private static final String INSERT_TABLE = + "INSERT INTO %s\n" + + " (schemaName, name, creationTime, updateTime, state, payload)\n" + + " VALUES(:schemaName, :name, :creationTime, :updateTime, :state, :payload)"; + + private static final String UPDATE_HEAD = + "UPDATE %s\n SET\n"; + + private static final String WHERE_TABLE_ID = + "WHERE schemaName = :schemaName\n" + + " AND name = :name\n"; + + private static final String SAFETY_CHECK = + " AND updateTime = :oldVersion"; + + private static final String UPDATE_DEFN_UNSAFE = + UPDATE_HEAD + + " payload = :payload,\n" + + " updateTime = :updateTime\n" + + WHERE_TABLE_ID; + + private static final String UPDATE_DEFN_SAFE = + UPDATE_DEFN_UNSAFE + + SAFETY_CHECK; + + private static final String UPDATE_STATE = + UPDATE_HEAD + + " state = :state,\n" + + " updateTime = :updateTime\n" + + WHERE_TABLE_ID; + + private static final String SELECT_TABLE = + "SELECT creationTime, updateTime, state, payload\n" + + "FROM %s\n" + + WHERE_TABLE_ID; + + private static final String SELECT_PAYLOAD = + "SELECT state, payload\n" + + "FROM %s\n" + + WHERE_TABLE_ID; + + private static final String SELECT_ALL_TABLES = + "SELECT schemaName, name\n" + + "FROM %s\n" + + "ORDER BY schemaName, name"; + + private static final String SELECT_TABLES_IN_SCHEMA = + "SELECT name\n" + + "FROM %s\n" + + "WHERE schemaName = :schemaName\n" + + "ORDER BY name"; + + private static final String SELECT_TABLE_DETAILS_IN_SCHEMA = + "SELECT name, creationTime, updateTime, state, payload\n" + + "FROM %s\n" + + "WHERE schemaName = :schemaName\n" + + "ORDER BY name"; + + private static final String DELETE_TABLE = + "DELETE FROM %s\n" + + WHERE_TABLE_ID; + + private final MetastoreManager metastoreManager; + private final SQLMetadataConnector connector; + private final ObjectMapper jsonMapper; + private final IDBI dbi; + private final String tableName; + private final Deque<Listener> listeners = new ConcurrentLinkedDeque<>(); + + @Inject + public SQLCatalogManager(MetastoreManager metastoreManager) + { + if (!metastoreManager.isSql()) { + throw new ISE("SQLCatalogManager only works with SQL based metadata store at this time"); + } + this.metastoreManager = metastoreManager; + this.connector = metastoreManager.sqlConnector(); + this.dbi = connector.getDBI(); + this.jsonMapper = metastoreManager.jsonMapper(); + this.tableName = getTableDefnTable(); + } + + @Override + @LifecycleStart + public void start() + { + createTableDefnTable(); + } + + @Override + public void stop() + { + } + + // Mimics what MetadataStorageTablesConfig should do. + public String getTableDefnTable() + { + final String base = metastoreManager.tablesConfig().getBase(); + if (Strings.isNullOrEmpty(base)) { + return TABLES_TABLE; + } else { + return StringUtils.format("%s_%s", base, TABLES_TABLE); + } + } + + // TODO: Move to SqlMetadataConnector + @Override + public void createTableDefnTable() + { + if (!metastoreManager.createTables()) { + return; + } + connector.createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %s (\n" + + " schemaName VARCHAR(255) NOT NULL,\n" + + " name VARCHAR(255) NOT NULL,\n" + + " creationTime BIGINT NOT NULL,\n" + + " updateTime BIGINT NOT NULL,\n" + + " state CHAR(1) NOT NULL,\n" + + " payload %s,\n" + + " PRIMARY KEY(schemaName, name)\n" + + ")", + tableName, + connector.getPayloadType()))); + } + + @Override + public long create(TableMetadata table) throws DuplicateKeyException + { + try { + return dbi.withHandle( + new HandleCallback<Long>() + { + @Override + public Long withHandle(Handle handle) throws DuplicateKeyException + { + long updateTime = System.currentTimeMillis(); + Update stmt = handle.createStatement( + StringUtils.format(INSERT_TABLE, tableName) + ) + .bind("schemaName", table.id().schema()) + .bind("name", table.id().name()) + .bind("creationTime", updateTime) + .bind("updateTime", updateTime) + .bind("state", TableMetadata.TableState.ACTIVE.code()) + .bind("payload", table.spec().toBytes(jsonMapper)); + try { + stmt.execute(); + } + catch (UnableToExecuteStatementException e) { + if (DbUtils.isDuplicateRecordException(e)) { + throw new DuplicateKeyException( + "Tried to insert a duplicate table: " + table.sqlName(), + e); + } else { + throw e; + } + } + sendAddition(table, updateTime); + return updateTime; + } + } + ); + } + catch (CallbackFailedException e) { + if (e.getCause() instanceof DuplicateKeyException) { + throw (DuplicateKeyException) e.getCause(); + } + throw e; + } + } + + @Override + public TableMetadata read(TableId id) + { + return dbi.withHandle( + new HandleCallback<TableMetadata>() + { + @Override + public TableMetadata withHandle(Handle handle) + { + Query<Map<String, Object>> query = handle.createQuery( + StringUtils.format(SELECT_TABLE, tableName) + ) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("schemaName", id.schema()) + .bind("name", id.name()); + final ResultIterator<TableMetadata> resultIterator = + query.map((index, r, ctx) -> + new TableMetadata( + id, + r.getLong(1), + r.getLong(2), + TableMetadata.TableState.fromCode(r.getString(3)), + TableSpec.fromBytes(jsonMapper, r.getBytes(4)) + )) + .iterator(); + if (resultIterator.hasNext()) { + return resultIterator.next(); + } + return null; + } + } + ); + } + + @Override + public long update(TableMetadata table, long oldVersion) throws OutOfDateException, NotFoundException + { + if (oldVersion == 0) { + return updateUnsafe(table.id(), table.spec()); + } else { + return updateSafe(table.id(), table.spec(), oldVersion); + } + } + + private long updateSafe(TableId id, TableSpec defn, long oldVersion) throws OutOfDateException + { + try { + return dbi.withHandle( + new HandleCallback<Long>() + { + @Override + public Long withHandle(Handle handle) throws OutOfDateException + { + long updateTime = System.currentTimeMillis(); + int updateCount = handle.createStatement( + StringUtils.format(UPDATE_DEFN_SAFE, tableName)) + .bind("schemaName", id.schema()) + .bind("name", id.name()) + .bind("payload", defn.toBytes(jsonMapper)) + .bind("updateTime", updateTime) + .bind("oldVersion", oldVersion) + .execute(); + if (updateCount == 0) { + throw new OutOfDateException( + StringUtils.format( + "Table %s: not found or update version does not match DB version", + id.sqlName())); + } + sendUpdate(id); + return updateTime; + } + } + ); + } + catch (CallbackFailedException e) { + if (e.getCause() instanceof OutOfDateException) { + throw (OutOfDateException) e.getCause(); + } + throw e; + } + } + + private long updateUnsafe(TableId id, TableSpec defn) throws NotFoundException + { + try { + return dbi.withHandle( + new HandleCallback<Long>() + { + @Override + public Long withHandle(Handle handle) throws NotFoundException + { + long updateTime = System.currentTimeMillis(); + int updateCount = handle.createStatement( + StringUtils.format(UPDATE_DEFN_UNSAFE, tableName)) + .bind("schemaName", id.schema()) + .bind("name", id.name()) + .bind("payload", defn.toBytes(jsonMapper)) + .bind("updateTime", updateTime) + .execute(); + if (updateCount == 0) { + throw new NotFoundException( + StringUtils.format("Table %s: not found", id.sqlName()) + ); + } + sendUpdate(id); + return updateTime; + } + } + ); + } + catch (CallbackFailedException e) { + if (e.getCause() instanceof NotFoundException) { + throw (NotFoundException) e.getCause(); + } + throw e; + } + } + + @Override + public long updatePayload(TableId id, Function<TableSpec, TableSpec> transform) throws NotFoundException + { + try { + return dbi.withHandle( + new HandleCallback<Long>() + { + @Override + public Long withHandle(Handle handle) throws NotFoundException + { + handle.begin(); + try { + Query<Map<String, Object>> query = handle.createQuery( + StringUtils.format(SELECT_PAYLOAD, tableName) + ) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("schemaName", id.schema()) + .bind("name", id.name()); + + final ResultIterator<TableMetadata> resultIterator = + query.map((index, r, ctx) -> + new TableMetadata( + id, + 0, + 0, + TableMetadata.TableState.fromCode(r.getString(1)), + TableSpec.fromBytes(jsonMapper, r.getBytes(2)) + )) + .iterator(); + TableMetadata table; + if (resultIterator.hasNext()) { + table = resultIterator.next(); + } else { + handle.rollback(); + throw new NotFoundException( + StringUtils.format("Table %s: not found", id.sqlName()) + ); + } + if (table.state() != TableMetadata.TableState.ACTIVE) { + throw new ISE("Table is in state [%s] and cannot be updated", table.state()); + } + TableSpec revised = transform.apply(table.spec()); + long updateTime = System.currentTimeMillis(); + int updateCount = handle.createStatement( + StringUtils.format(UPDATE_DEFN_UNSAFE, tableName)) + .bind("schemaName", id.schema()) + .bind("name", id.name()) + .bind("payload", revised.toBytes(jsonMapper)) + .bind("updateTime", updateTime) + .execute(); + if (updateCount == 0) { + // Should never occur because we're holding a lock. + throw new ISE("Table %s: not found", id.sqlName()); + } + handle.commit(); + sendUpdate(id); + return updateTime; + } + catch (RuntimeException e) { + handle.rollback(); + throw e; + } + } + } + ); + } + catch (CallbackFailedException e) { + if (e.getCause() instanceof NotFoundException) { + throw (NotFoundException) e.getCause(); + } + throw e; + } + } + + @Override + public long markDeleting(TableId id) Review Comment: In a comment above I noted that Druid has no ability to evolve its metadata DB schema: the first schema for a table has to remain fixed for all time. This means that we have to carefully ensure we get the schema right the first time. That, in turn, requires that we violate the agile principle which says to only add code for today's features. So, what other columns might we need in our table metadata table? We will need state coordination when we add DDL statement. Specifically, the ability to handle a `DELETE TABLE` request: * Mark the metadata as in the `DELETING` state. This "locks" the table name to prevent any attempt to reuse that name, and tells the planner to disallow queries on that table. * Launch a potentially long-running job to unload segments and delete them. * Remove the catalog entry for the table. Given this, a table can never transition from `DELETING` back to `ACTIVE`. Note that, at present, we _don't_ have the `DELETE TABLE` DDL statement, or the above logic, so this field is purely in anticipation for how we _would_ implement _when_ we choose to do so. ########## extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/SQLCatalogManager.java: ########## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.storage.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import org.apache.druid.catalog.model.TableId; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.TableSpec; +import org.apache.druid.catalog.storage.MetastoreManager; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.Update; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Function; + +@ManageLifecycle +public class SQLCatalogManager implements CatalogManager +{ + public static final String TABLES_TABLE = "tableDefs"; + + private static final String INSERT_TABLE = + "INSERT INTO %s\n" + + " (schemaName, name, creationTime, updateTime, state, payload)\n" + + " VALUES(:schemaName, :name, :creationTime, :updateTime, :state, :payload)"; + + private static final String UPDATE_HEAD = + "UPDATE %s\n SET\n"; + + private static final String WHERE_TABLE_ID = + "WHERE schemaName = :schemaName\n" + + " AND name = :name\n"; + + private static final String SAFETY_CHECK = + " AND updateTime = :oldVersion"; + + private static final String UPDATE_DEFN_UNSAFE = + UPDATE_HEAD + + " payload = :payload,\n" + + " updateTime = :updateTime\n" + + WHERE_TABLE_ID; + + private static final String UPDATE_DEFN_SAFE = + UPDATE_DEFN_UNSAFE + + SAFETY_CHECK; + + private static final String UPDATE_STATE = + UPDATE_HEAD + + " state = :state,\n" + + " updateTime = :updateTime\n" + + WHERE_TABLE_ID; + + private static final String SELECT_TABLE = + "SELECT creationTime, updateTime, state, payload\n" + + "FROM %s\n" + + WHERE_TABLE_ID; + + private static final String SELECT_PAYLOAD = + "SELECT state, payload\n" + + "FROM %s\n" + + WHERE_TABLE_ID; + + private static final String SELECT_ALL_TABLES = + "SELECT schemaName, name\n" + + "FROM %s\n" + + "ORDER BY schemaName, name"; + + private static final String SELECT_TABLES_IN_SCHEMA = + "SELECT name\n" + + "FROM %s\n" + + "WHERE schemaName = :schemaName\n" + + "ORDER BY name"; + + private static final String SELECT_TABLE_DETAILS_IN_SCHEMA = + "SELECT name, creationTime, updateTime, state, payload\n" + + "FROM %s\n" + + "WHERE schemaName = :schemaName\n" + + "ORDER BY name"; + + private static final String DELETE_TABLE = + "DELETE FROM %s\n" + + WHERE_TABLE_ID; + + private final MetastoreManager metastoreManager; + private final SQLMetadataConnector connector; + private final ObjectMapper jsonMapper; + private final IDBI dbi; + private final String tableName; + private final Deque<Listener> listeners = new ConcurrentLinkedDeque<>(); + + @Inject + public SQLCatalogManager(MetastoreManager metastoreManager) + { + if (!metastoreManager.isSql()) { + throw new ISE("SQLCatalogManager only works with SQL based metadata store at this time"); + } + this.metastoreManager = metastoreManager; + this.connector = metastoreManager.sqlConnector(); + this.dbi = connector.getDBI(); + this.jsonMapper = metastoreManager.jsonMapper(); + this.tableName = getTableDefnTable(); + } + + @Override + @LifecycleStart + public void start() + { + createTableDefnTable(); + } + + @Override + public void stop() + { + } + + // Mimics what MetadataStorageTablesConfig should do. + public String getTableDefnTable() + { + final String base = metastoreManager.tablesConfig().getBase(); + if (Strings.isNullOrEmpty(base)) { + return TABLES_TABLE; + } else { + return StringUtils.format("%s_%s", base, TABLES_TABLE); + } + } + + // TODO: Move to SqlMetadataConnector + @Override + public void createTableDefnTable() + { + if (!metastoreManager.createTables()) { + return; + } + connector.createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %s (\n" + + " schemaName VARCHAR(255) NOT NULL,\n" + + " name VARCHAR(255) NOT NULL,\n" + + " creationTime BIGINT NOT NULL,\n" + + " updateTime BIGINT NOT NULL,\n" + + " state CHAR(1) NOT NULL,\n" + + " payload %s,\n" + + " PRIMARY KEY(schemaName, name)\n" + + ")", + tableName, + connector.getPayloadType()))); + } + + @Override + public long create(TableMetadata table) throws DuplicateKeyException + { + try { + return dbi.withHandle( + new HandleCallback<Long>() + { + @Override + public Long withHandle(Handle handle) throws DuplicateKeyException + { + long updateTime = System.currentTimeMillis(); + Update stmt = handle.createStatement( + StringUtils.format(INSERT_TABLE, tableName) + ) + .bind("schemaName", table.id().schema()) + .bind("name", table.id().name()) + .bind("creationTime", updateTime) + .bind("updateTime", updateTime) + .bind("state", TableMetadata.TableState.ACTIVE.code()) + .bind("payload", table.spec().toBytes(jsonMapper)); + try { + stmt.execute(); + } + catch (UnableToExecuteStatementException e) { + if (DbUtils.isDuplicateRecordException(e)) { + throw new DuplicateKeyException( + "Tried to insert a duplicate table: " + table.sqlName(), + e); + } else { + throw e; + } + } + sendAddition(table, updateTime); + return updateTime; + } + } + ); + } + catch (CallbackFailedException e) { + if (e.getCause() instanceof DuplicateKeyException) { + throw (DuplicateKeyException) e.getCause(); + } + throw e; + } + } + + @Override + public TableMetadata read(TableId id) + { + return dbi.withHandle( + new HandleCallback<TableMetadata>() + { + @Override + public TableMetadata withHandle(Handle handle) + { + Query<Map<String, Object>> query = handle.createQuery( + StringUtils.format(SELECT_TABLE, tableName) + ) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("schemaName", id.schema()) + .bind("name", id.name()); + final ResultIterator<TableMetadata> resultIterator = + query.map((index, r, ctx) -> + new TableMetadata( + id, + r.getLong(1), + r.getLong(2), + TableMetadata.TableState.fromCode(r.getString(3)), + TableSpec.fromBytes(jsonMapper, r.getBytes(4)) + )) + .iterator(); + if (resultIterator.hasNext()) { + return resultIterator.next(); + } + return null; + } + } + ); + } + + @Override + public long update(TableMetadata table, long oldVersion) throws OutOfDateException, NotFoundException + { + if (oldVersion == 0) { + return updateUnsafe(table.id(), table.spec()); + } else { + return updateSafe(table.id(), table.spec(), oldVersion); + } + } + + private long updateSafe(TableId id, TableSpec defn, long oldVersion) throws OutOfDateException + { + try { + return dbi.withHandle( + new HandleCallback<Long>() + { + @Override + public Long withHandle(Handle handle) throws OutOfDateException + { + long updateTime = System.currentTimeMillis(); + int updateCount = handle.createStatement( + StringUtils.format(UPDATE_DEFN_SAFE, tableName)) + .bind("schemaName", id.schema()) + .bind("name", id.name()) + .bind("payload", defn.toBytes(jsonMapper)) + .bind("updateTime", updateTime) + .bind("oldVersion", oldVersion) + .execute(); + if (updateCount == 0) { + throw new OutOfDateException( + StringUtils.format( + "Table %s: not found or update version does not match DB version", + id.sqlName())); + } + sendUpdate(id); + return updateTime; + } + } + ); + } + catch (CallbackFailedException e) { + if (e.getCause() instanceof OutOfDateException) { + throw (OutOfDateException) e.getCause(); + } + throw e; + } + } + + private long updateUnsafe(TableId id, TableSpec defn) throws NotFoundException Review Comment: The answer is configuration-as-code. On K8s, we can push a new spec without having to match versions. On K8s, the "truth" is in GitHub: the specs stored in K8s reflect that truth. We expect that a valid use of the catalog is to maintain the table spec (and ingest specs, and MSQ ingest queries, and application queries) in GitHub, and push the latest version when anything changes. For that, there is no reason for a version check since whatever is in GitHub is "more right" than anything in Druid. On the other hand, there may be users that want Druid to act more like a regular RDBMS: Druid contains the "truth", and a UI (such as Druid console) maintains the copy. In this case, the UI needs tools to guard against concurrent updates. That tool is the version. ########## server/src/main/java/org/apache/druid/catalog/model/SchemaRegistryImpl.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.model; + +import org.apache.druid.catalog.model.table.AbstractDatasourceDefn; +import org.apache.druid.catalog.model.table.ExternalTableDefn; +import org.apache.druid.server.security.ResourceType; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * Hard-coded schema registry that knows about the well-known, and + * a few obscure, Druid schemas. Does not allow for user-defined + * schemas, which the rest of Druid would not be able to support. + */ +public class SchemaRegistryImpl implements SchemaRegistry +{ + // Mimics the definition in ExternalOperatorConvertion + // TODO: Change this when ExternalOperatorConvertion changes + private String EXTERNAL_RESOURCE = "EXTERNAL"; + + public static class SchemaDefnImpl implements SchemaSpec + { + private final String name; + private final String resource; + private final Set<String> accepts; + + public SchemaDefnImpl( + String name, + String resource, + Set<String> accepts + ) + { + this.name = name; + this.resource = resource; + this.accepts = accepts; + } + + @Override + public String name() + { + return name; + } + + @Override + public String securityResource() + { + return resource; + } + + @Override + public boolean writable() + { + return accepts != null && !accepts.isEmpty(); + } + + @Override + public boolean accepts(String tableType) + { + if (accepts == null) { + return false; + } + return accepts.contains(tableType); + } + } + + private final Map<String, SchemaSpec> builtIns; + + public SchemaRegistryImpl() + { + builtIns = new HashMap<>(); + register(new SchemaDefnImpl( + TableId.DRUID_SCHEMA, + ResourceType.DATASOURCE, + AbstractDatasourceDefn.tableTypes() + )); + register(new SchemaDefnImpl( + TableId.LOOKUP_SCHEMA, + ResourceType.CONFIG, + null // TODO Review Comment: It means that this early draft does not yet include lookup tables and so there are no valid table types for the lookup schema. Ideally, that will change as the project progresses so that lookup tables can have entries in the catalog. ########## core/src/main/java/org/apache/druid/data/input/InputFormat.java: ########## @@ -41,15 +41,17 @@ * See {@link NestedInputFormat} for nested input formats such as JSON. */ @UnstableApi -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = InputFormat.TYPE_PROPERTY) @JsonSubTypes(value = { - @Type(name = "csv", value = CsvInputFormat.class), - @Type(name = "json", value = JsonInputFormat.class), + @Type(name = CsvInputFormat.TYPE_KEY, value = CsvInputFormat.class), + @Type(name = JsonInputFormat.TYPE_KEY, value = JsonInputFormat.class), @Type(name = "regex", value = RegexInputFormat.class), Review Comment: Laziness? I had not yet created catalog equivalents for the regex type, so I had not yet changed the format. Went ahead and tidied that up. ########## server/src/main/java/org/apache/druid/catalog/model/TableMetadata.java: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import org.apache.druid.guice.annotations.PublicApi; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; + +import java.util.Objects; + +/** + * REST API level description of a table. Tables have multiple types + * as described by subclasses. Stores the operational aspects of a + * table, such as its name, creation time, state and spec. + * + * @see {@link ResolvedTable} for the semantic representation. + */ +@PublicApi +public class TableMetadata +{ + public enum TableState + { + ACTIVE("A"), + DELETING("D"); + + private final String code; + + TableState(String code) + { + this.code = code; + } + + public String code() + { + return code; + } + + public static TableState fromCode(String code) + { + for (TableState state : values()) { + if (state.code.equals(code)) { + return state; + } + } + throw new ISE("Unknown TableState code: " + code); + } + } + + private final TableId id; + private final long creationTime; + private final long updateTime; + private final TableState state; + private final TableSpec spec; Review Comment: It is just a bit confusing, but here's the story. There are two "views" of the catalog. One view is that it is a set of generic tale specs stored in a DB, maintained via the REST API, and shipped from Coordinator to Broker. In that "mechanical" or "syntactic" view, the table spec is what the user provides, and this class, `TableMetadata` is an in-memory view of the DB record for a table. The other view is the "resolved" or "semantic" view of a table: the one where the spec is joined with the "meta-meta-data" that gives meaning to the generic specs. That view is represented by the `ResolvedTable` class. The idea is that we can perform DB, rest and sync operations without the resolved info. We can perform semantic operations without the DB artifacts such as timestamps and state. This dual-approach allows generic processing of tables some times (such as when merging properties during an update), while doing table-type-specific processing at other times. ########## extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/SQLCatalogManager.java: ########## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.storage.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import org.apache.druid.catalog.model.TableId; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.TableSpec; +import org.apache.druid.catalog.storage.MetastoreManager; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.Update; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Function; + +@ManageLifecycle +public class SQLCatalogManager implements CatalogManager +{ + public static final String TABLES_TABLE = "tableDefs"; + + private static final String INSERT_TABLE = + "INSERT INTO %s\n" + + " (schemaName, name, creationTime, updateTime, state, payload)\n" + + " VALUES(:schemaName, :name, :creationTime, :updateTime, :state, :payload)"; + + private static final String UPDATE_HEAD = + "UPDATE %s\n SET\n"; + + private static final String WHERE_TABLE_ID = + "WHERE schemaName = :schemaName\n" + + " AND name = :name\n"; + + private static final String SAFETY_CHECK = + " AND updateTime = :oldVersion"; + + private static final String UPDATE_DEFN_UNSAFE = + UPDATE_HEAD + + " payload = :payload,\n" + + " updateTime = :updateTime\n" + + WHERE_TABLE_ID; + + private static final String UPDATE_DEFN_SAFE = + UPDATE_DEFN_UNSAFE + + SAFETY_CHECK; Review Comment: The idea of generated SQL is that you _don't_ need to see the generated SQL. Though, if you do, you can do what I do: just use the debugger and look at the generated statement. That said, I suppose that, now that the schema is settling down, I don't need the code gen trick and can instead just copy/paste the SQL for each use... I might wait just a bit longer before doing that in case things change some more. ########## server/src/main/java/org/apache/druid/catalog/model/CatalogUtils.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.model; + +import com.google.common.base.Strings; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.joda.time.Period; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class CatalogUtils +{ + public static List<String> columnNames(List<ColumnSpec> columns) + { + return columns + .stream() + .map(col -> col.name()) + .collect(Collectors.toList()); + } + + /** + * Convert a catalog granularity string to the Druid form. Catalog granularities + * are either the usual descriptive strings (in any case), or an ISO period. + * For the odd interval, the interval name is also accepted (for the other + * intervals, the interval name is the descriptive string). + */ + public static Granularity asDruidGranularity(String value) + { + if (Strings.isNullOrEmpty(value)) { + return Granularities.ALL; + } + try { + return new PeriodGranularity(new Period(value), null, null); + } + catch (IllegalArgumentException e) { + throw new IAE(StringUtils.format("%s is an invalid period string", value)); + } + } + + /** + * {@code String}-to-{@code List<String>} conversion. The string can contain zero items, + * one items, or a list. The list items are separated by a comma and optional + * whitespace. + */ + public static List<String> stringToList(String value) Review Comment: The meaning here is "take a property value which is a string that contains a comma-delimited list of values. Return a list of strings." They key fact is that this is what allows a single string property to contain a list. (This is important in SQL: ```sql SELECT * FROM TABLE(csv(files -> "foo.csv,bar.csv")) ``` This is not meant to be a generic list-splitter. Perhaps we will eventually need to support list members that contain commas, and thus escape them. This function would be the place to handle that specialized, property-only encoding. ########## server/src/main/java/org/apache/druid/catalog/model/SchemaRegistryImpl.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.model; + +import org.apache.druid.catalog.model.table.DatasourceDefn; +import org.apache.druid.catalog.model.table.ExternalTableDefn; +import org.apache.druid.server.security.ResourceType; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * Hard-coded schema registry that knows about the well-known, and + * a few obscure, Druid schemas. Does not allow for user-defined + * schemas, which the rest of Druid would not be able to support. + */ +public class SchemaRegistryImpl implements SchemaRegistry +{ + // Mimics the definition in ExternalOperatorConvertion + // TODO: Change this when ExternalOperatorConvertion changes + private String EXTERNAL_RESOURCE = "EXTERNAL"; + + public static class SchemaDefnImpl implements SchemaSpec + { + private final String name; + private final String resource; + private final Set<String> accepts; + + public SchemaDefnImpl( + String name, + String resource, + Set<String> accepts + ) + { + this.name = name; + this.resource = resource; + this.accepts = accepts; + } + + @Override + public String name() + { + return name; + } + + @Override + public String securityResource() + { + return resource; + } + + @Override + public boolean writable() + { + return accepts != null && !accepts.isEmpty(); + } + + @Override + public boolean accepts(String tableType) + { + if (accepts == null) { + return false; + } + return accepts.contains(tableType); + } + } + + private final Map<String, SchemaSpec> builtIns; + + public SchemaRegistryImpl() + { + builtIns = new HashMap<>(); + register(new SchemaDefnImpl( + TableId.DRUID_SCHEMA, Review Comment: Do you mean moving the constants, such as `DRUID_SCHEMA`? the `TableId` class is meant to be more general: it is used whenever we want to talk about schema/table pairs. Ideally, the constants would not even be there, but would be in some common location. At present, we can't use the existing constants because they live in the `sql` package and the `server` package doesn't have visibility to that package. ########## server/src/main/java/org/apache/druid/catalog/model/SchemaRegistryImpl.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.model; + +import org.apache.druid.catalog.model.table.AbstractDatasourceDefn; +import org.apache.druid.catalog.model.table.ExternalTableDefn; +import org.apache.druid.server.security.ResourceType; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * Hard-coded schema registry that knows about the well-known, and + * a few obscure, Druid schemas. Does not allow for user-defined + * schemas, which the rest of Druid would not be able to support. + */ +public class SchemaRegistryImpl implements SchemaRegistry +{ + // Mimics the definition in ExternalOperatorConvertion + // TODO: Change this when ExternalOperatorConvertion changes + private String EXTERNAL_RESOURCE = "EXTERNAL"; + + public static class SchemaDefnImpl implements SchemaSpec + { + private final String name; + private final String resource; + private final Set<String> accepts; + + public SchemaDefnImpl( + String name, + String resource, + Set<String> accepts + ) + { + this.name = name; + this.resource = resource; + this.accepts = accepts; + } + + @Override + public String name() + { + return name; + } + + @Override + public String securityResource() + { + return resource; + } + + @Override + public boolean writable() + { + return accepts != null && !accepts.isEmpty(); + } + + @Override + public boolean accepts(String tableType) + { + if (accepts == null) { + return false; + } + return accepts.contains(tableType); + } + } + + private final Map<String, SchemaSpec> builtIns; + + public SchemaRegistryImpl() + { + builtIns = new HashMap<>(); + register(new SchemaDefnImpl( + TableId.DRUID_SCHEMA, + ResourceType.DATASOURCE, + AbstractDatasourceDefn.tableTypes() + )); + register(new SchemaDefnImpl( + TableId.LOOKUP_SCHEMA, + ResourceType.CONFIG, + null // TODO + )); + register(new SchemaDefnImpl( + TableId.CATALOG_SCHEMA, + ResourceType.SYSTEM_TABLE, + null + )); + register(new SchemaDefnImpl( + TableId.SYSTEM_SCHEMA, + ResourceType.SYSTEM_TABLE, + null + )); + register(new SchemaDefnImpl( + TableId.EXTERNAL_SCHEMA, + EXTERNAL_RESOURCE, + ExternalTableDefn.tableTypes() + )); + register(new SchemaDefnImpl( + TableId.VIEW_SCHEMA, + ResourceType.VIEW, + null // TODO Review Comment: Druid currently offers at multiple view systems: one via an extension, another two available from Imply. There may be more. Eventually, it will make sense to define views in the catalog, just as is done in most DB systems. The TODO here is a reminder that that work is pending once we get the basics working. ########## extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/SQLCatalogManager.java: ########## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.storage.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import org.apache.druid.catalog.model.TableId; +import org.apache.druid.catalog.model.TableMetadata; +import org.apache.druid.catalog.model.TableSpec; +import org.apache.druid.catalog.storage.MetastoreManager; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.metadata.SQLMetadataConnector; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.Update; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Function; + +@ManageLifecycle +public class SQLCatalogManager implements CatalogManager +{ + public static final String TABLES_TABLE = "tableDefs"; + + private static final String INSERT_TABLE = + "INSERT INTO %s\n" + + " (schemaName, name, creationTime, updateTime, state, payload)\n" + + " VALUES(:schemaName, :name, :creationTime, :updateTime, :state, :payload)"; + + private static final String UPDATE_HEAD = + "UPDATE %s\n SET\n"; + + private static final String WHERE_TABLE_ID = + "WHERE schemaName = :schemaName\n" + + " AND name = :name\n"; + + private static final String SAFETY_CHECK = + " AND updateTime = :oldVersion"; + + private static final String UPDATE_DEFN_UNSAFE = + UPDATE_HEAD + + " payload = :payload,\n" + + " updateTime = :updateTime\n" + + WHERE_TABLE_ID; + + private static final String UPDATE_DEFN_SAFE = + UPDATE_DEFN_UNSAFE + + SAFETY_CHECK; + + private static final String UPDATE_STATE = + UPDATE_HEAD + + " state = :state,\n" + + " updateTime = :updateTime\n" + + WHERE_TABLE_ID; + + private static final String SELECT_TABLE = + "SELECT creationTime, updateTime, state, payload\n" + + "FROM %s\n" + + WHERE_TABLE_ID; + + private static final String SELECT_PAYLOAD = + "SELECT state, payload\n" + + "FROM %s\n" + + WHERE_TABLE_ID; + + private static final String SELECT_ALL_TABLES = + "SELECT schemaName, name\n" + + "FROM %s\n" + + "ORDER BY schemaName, name"; + + private static final String SELECT_TABLES_IN_SCHEMA = + "SELECT name\n" + + "FROM %s\n" + + "WHERE schemaName = :schemaName\n" + + "ORDER BY name"; + + private static final String SELECT_TABLE_DETAILS_IN_SCHEMA = + "SELECT name, creationTime, updateTime, state, payload\n" + + "FROM %s\n" + + "WHERE schemaName = :schemaName\n" + + "ORDER BY name"; + + private static final String DELETE_TABLE = + "DELETE FROM %s\n" + + WHERE_TABLE_ID; + + private final MetastoreManager metastoreManager; + private final SQLMetadataConnector connector; + private final ObjectMapper jsonMapper; + private final IDBI dbi; + private final String tableName; + private final Deque<Listener> listeners = new ConcurrentLinkedDeque<>(); + + @Inject + public SQLCatalogManager(MetastoreManager metastoreManager) + { + if (!metastoreManager.isSql()) { + throw new ISE("SQLCatalogManager only works with SQL based metadata store at this time"); + } + this.metastoreManager = metastoreManager; + this.connector = metastoreManager.sqlConnector(); + this.dbi = connector.getDBI(); + this.jsonMapper = metastoreManager.jsonMapper(); + this.tableName = getTableDefnTable(); + } + + @Override + @LifecycleStart + public void start() + { + createTableDefnTable(); + } + + @Override + public void stop() + { + } + + // Mimics what MetadataStorageTablesConfig should do. + public String getTableDefnTable() + { + final String base = metastoreManager.tablesConfig().getBase(); + if (Strings.isNullOrEmpty(base)) { + return TABLES_TABLE; + } else { + return StringUtils.format("%s_%s", base, TABLES_TABLE); + } + } + + // TODO: Move to SqlMetadataConnector + @Override + public void createTableDefnTable() + { + if (!metastoreManager.createTables()) { + return; + } + connector.createTable( + tableName, + ImmutableList.of( + StringUtils.format( + "CREATE TABLE %s (\n" + + " schemaName VARCHAR(255) NOT NULL,\n" + + " name VARCHAR(255) NOT NULL,\n" + + " creationTime BIGINT NOT NULL,\n" + + " updateTime BIGINT NOT NULL,\n" + + " state CHAR(1) NOT NULL,\n" + + " payload %s,\n" + + " PRIMARY KEY(schemaName, name)\n" + + ")", + tableName, + connector.getPayloadType()))); + } + + @Override + public long create(TableMetadata table) throws DuplicateKeyException + { + try { + return dbi.withHandle( + new HandleCallback<Long>() + { + @Override + public Long withHandle(Handle handle) throws DuplicateKeyException + { + long updateTime = System.currentTimeMillis(); + Update stmt = handle.createStatement( + StringUtils.format(INSERT_TABLE, tableName) + ) + .bind("schemaName", table.id().schema()) + .bind("name", table.id().name()) + .bind("creationTime", updateTime) + .bind("updateTime", updateTime) + .bind("state", TableMetadata.TableState.ACTIVE.code()) + .bind("payload", table.spec().toBytes(jsonMapper)); + try { + stmt.execute(); + } + catch (UnableToExecuteStatementException e) { + if (DbUtils.isDuplicateRecordException(e)) { + throw new DuplicateKeyException( + "Tried to insert a duplicate table: " + table.sqlName(), + e); + } else { + throw e; + } + } + sendAddition(table, updateTime); + return updateTime; + } + } + ); + } + catch (CallbackFailedException e) { + if (e.getCause() instanceof DuplicateKeyException) { + throw (DuplicateKeyException) e.getCause(); + } + throw e; + } + } + + @Override + public TableMetadata read(TableId id) + { + return dbi.withHandle( + new HandleCallback<TableMetadata>() + { + @Override + public TableMetadata withHandle(Handle handle) + { + Query<Map<String, Object>> query = handle.createQuery( + StringUtils.format(SELECT_TABLE, tableName) + ) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("schemaName", id.schema()) + .bind("name", id.name()); + final ResultIterator<TableMetadata> resultIterator = + query.map((index, r, ctx) -> + new TableMetadata( + id, + r.getLong(1), + r.getLong(2), + TableMetadata.TableState.fromCode(r.getString(3)), + TableSpec.fromBytes(jsonMapper, r.getBytes(4)) + )) + .iterator(); + if (resultIterator.hasNext()) { + return resultIterator.next(); + } + return null; + } + } + ); + } + + @Override + public long update(TableMetadata table, long oldVersion) throws OutOfDateException, NotFoundException + { + if (oldVersion == 0) { + return updateUnsafe(table.id(), table.spec()); + } else { + return updateSafe(table.id(), table.spec(), oldVersion); + } + } + + private long updateSafe(TableId id, TableSpec defn, long oldVersion) throws OutOfDateException + { + try { + return dbi.withHandle( + new HandleCallback<Long>() + { + @Override + public Long withHandle(Handle handle) throws OutOfDateException + { + long updateTime = System.currentTimeMillis(); + int updateCount = handle.createStatement( + StringUtils.format(UPDATE_DEFN_SAFE, tableName)) + .bind("schemaName", id.schema()) + .bind("name", id.name()) + .bind("payload", defn.toBytes(jsonMapper)) + .bind("updateTime", updateTime) + .bind("oldVersion", oldVersion) + .execute(); + if (updateCount == 0) { + throw new OutOfDateException( + StringUtils.format( + "Table %s: not found or update version does not match DB version", + id.sqlName())); + } + sendUpdate(id); + return updateTime; + } + } + ); + } + catch (CallbackFailedException e) { + if (e.getCause() instanceof OutOfDateException) { + throw (OutOfDateException) e.getCause(); + } + throw e; + } + } + + private long updateUnsafe(TableId id, TableSpec defn) throws NotFoundException + { + try { + return dbi.withHandle( + new HandleCallback<Long>() + { + @Override + public Long withHandle(Handle handle) throws NotFoundException + { + long updateTime = System.currentTimeMillis(); + int updateCount = handle.createStatement( + StringUtils.format(UPDATE_DEFN_UNSAFE, tableName)) + .bind("schemaName", id.schema()) + .bind("name", id.name()) + .bind("payload", defn.toBytes(jsonMapper)) + .bind("updateTime", updateTime) + .execute(); + if (updateCount == 0) { + throw new NotFoundException( + StringUtils.format("Table %s: not found", id.sqlName()) + ); + } + sendUpdate(id); + return updateTime; + } + } + ); + } + catch (CallbackFailedException e) { + if (e.getCause() instanceof NotFoundException) { + throw (NotFoundException) e.getCause(); + } + throw e; + } + } + + @Override + public long updatePayload(TableId id, Function<TableSpec, TableSpec> transform) throws NotFoundException + { + try { + return dbi.withHandle( + new HandleCallback<Long>() + { + @Override + public Long withHandle(Handle handle) throws NotFoundException + { + handle.begin(); + try { + Query<Map<String, Object>> query = handle.createQuery( + StringUtils.format(SELECT_PAYLOAD, tableName) + ) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("schemaName", id.schema()) + .bind("name", id.name()); + + final ResultIterator<TableMetadata> resultIterator = + query.map((index, r, ctx) -> + new TableMetadata( + id, + 0, + 0, + TableMetadata.TableState.fromCode(r.getString(1)), + TableSpec.fromBytes(jsonMapper, r.getBytes(2)) + )) + .iterator(); + TableMetadata table; + if (resultIterator.hasNext()) { + table = resultIterator.next(); + } else { + handle.rollback(); + throw new NotFoundException( + StringUtils.format("Table %s: not found", id.sqlName()) + ); + } + if (table.state() != TableMetadata.TableState.ACTIVE) { + throw new ISE("Table is in state [%s] and cannot be updated", table.state()); + } + TableSpec revised = transform.apply(table.spec()); + long updateTime = System.currentTimeMillis(); + int updateCount = handle.createStatement( + StringUtils.format(UPDATE_DEFN_UNSAFE, tableName)) + .bind("schemaName", id.schema()) + .bind("name", id.name()) + .bind("payload", revised.toBytes(jsonMapper)) + .bind("updateTime", updateTime) + .execute(); + if (updateCount == 0) { + // Should never occur because we're holding a lock. + throw new ISE("Table %s: not found", id.sqlName()); + } + handle.commit(); + sendUpdate(id); + return updateTime; + } + catch (RuntimeException e) { + handle.rollback(); + throw e; + } + } + } + ); + } + catch (CallbackFailedException e) { + if (e.getCause() instanceof NotFoundException) { + throw (NotFoundException) e.getCause(); + } + throw e; + } + } + + @Override + public long markDeleting(TableId id) + { + return dbi.withHandle( + new HandleCallback<Long>() + { + @Override + public Long withHandle(Handle handle) + { + long updateTime = System.currentTimeMillis(); + int updateCount = handle.createStatement( + StringUtils.format(UPDATE_STATE, tableName)) + .bind("schemaName", id.schema()) + .bind("name", id.name()) + .bind("updateTime", updateTime) + .bind("state", TableMetadata.TableState.DELETING.code()) + .execute(); + sendDeletion(id); + return updateCount == 1 ? updateTime : 0; + } + } + ); + } + + @Override + public boolean delete(TableId id) + { + return dbi.withHandle( + new HandleCallback<Boolean>() + { + @Override + public Boolean withHandle(Handle handle) + { + int updateCount = handle.createStatement( + StringUtils.format(DELETE_TABLE, tableName)) + .bind("schemaName", id.schema()) + .bind("name", id.name()) Review Comment: Yes. The catalog is a set of hints on top. of the physical set of segments. It can exist before any segments exist. In either case, if you don't want the hints, you can delete them at any time. This is different than the `DELETE TABLE` DDL case above: this just says to remove the hints, but says nothing about the actual segments. ########## extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/MetastoreManager.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.catalog.storage; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.metadata.MetadataStorageConnector; +import org.apache.druid.metadata.MetadataStorageConnectorConfig; +import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataConnector; + +/** + * Represents the metastore manager database and its implementation. + * Abstracts away the various kick-knacks used to define the metastore. + * The metastore operations are defined via table-specific classes. + */ +public interface MetastoreManager +{ + MetadataStorageConnector connector(); + MetadataStorageConnectorConfig config(); + MetadataStorageTablesConfig tablesConfig(); + + /** + * Whether to create tables if they do not exist. + */ + boolean createTables(); + + /** + * Object mapper to use for serializing and deserializing + * JSON objects stored in the metastore DB. + */ + ObjectMapper jsonMapper(); + + /** + * Is the implementation SQL-based? + */ + boolean isSql(); + + /** + * If SQL based, return the SQL version of the metastore + * connector. Throws an exception if not SQL-based. + */ + SQLMetadataConnector sqlConnector(); Review Comment: You are probably right. This is a work-in-progress: I have not currently set up a way to test with one of the non-SQL storage extensions. The DB integration is rather a hack: extensions can't really depend on one another, so the extension-DB code to create tables (etc.) can't know about the catalog (yet), and so that code lives here. Since it lives here, we have to know that we are, in fact, dealing with a SQL DB so the code can blow up if the storage is something else (and so this extension doesn't know how to create the table-like-thing that the target non-SQL DB needs.) Druid is in dire need of a more general DB storage API (as well as a mechanism for DB schema evolution.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
