paul-rogers commented on code in PR #13165:
URL: https://github.com/apache/druid/pull/13165#discussion_r1010661428


##########
server/src/main/java/org/apache/druid/catalog/model/table/ClusterKeySpec.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+public class ClusterKeySpec
+{
+  private final String expr;
+  private final boolean desc;

Review Comment:
   I see the point. Do we have a design for the new kinds of clustering at the 
column level? For example, instead of sorting a key, might we hash it? Or, 
would we say, "do sort it, but use XYZ sort instead of ABC sort?" How would we 
represent this in SQL? At present, `CLUSTER BY` looks like a `SORT` spec, I 
believe.
   
   If we have a design, then I can update this item to match.
   
   If, on the other hand, these other items have to do with _indexing_ of 
columns (not the segment sort order), then column properties would be the way 
to go.



##########
server/src/main/java/org/apache/druid/catalog/model/ColumnSpec.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.java.util.common.IAE;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Specification of table columns. Columns have multiple types
+ * represented via the type field.
+ */
+@UnstableApi
+public class ColumnSpec
+{
+  private final String type;
+  private final String name;
+  private final String sqlType;

Review Comment:
   I agree that `ColumnType` is the actual Druid type. The `ColumnType` is the 
storage type, which is limited. For example, all complex types are represented 
as `UKNOWN_COMPLEX`, which is the physical, not user view. Over time, we'll 
want an actual type name for a complex type: something like `IP_ADDRESS` rather 
than just "complex". The specific names are an active point of discussion. 
Also, a `__time` is a `LONG` internally, but it is a `TIMESTAMP` semantically.
   
   We do limit the set of SQL types: only those that directly map to Druid 
types.



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/SQLCatalogManager.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.storage.sql;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.storage.MetastoreManager;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.ResultIterator;
+import org.skife.jdbi.v2.Update;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Function;
+
+@ManageLifecycle
+public class SQLCatalogManager implements CatalogManager
+{
+  public static final String TABLES_TABLE = "tableDefs";
+
+  private static final String INSERT_TABLE =
+      "INSERT INTO %s\n" +
+      "  (schemaName, name, creationTime, updateTime, state, payload)\n" +
+      "  VALUES(:schemaName, :name, :creationTime, :updateTime, :state, 
:payload)";
+
+  private static final String UPDATE_HEAD =
+      "UPDATE %s\n SET\n";
+
+  private static final String WHERE_TABLE_ID =
+      "WHERE schemaName = :schemaName\n" +
+      "  AND name = :name\n";
+
+  private static final String SAFETY_CHECK =
+      "  AND updateTime = :oldVersion";
+
+  private static final String UPDATE_DEFN_UNSAFE =
+      UPDATE_HEAD +
+      "  payload = :payload,\n" +
+      "  updateTime = :updateTime\n" +
+      WHERE_TABLE_ID;
+
+  private static final String UPDATE_DEFN_SAFE =
+      UPDATE_DEFN_UNSAFE +
+      SAFETY_CHECK;
+
+  private static final String UPDATE_STATE =
+      UPDATE_HEAD +
+      "  state = :state,\n" +
+      "  updateTime = :updateTime\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_TABLE =
+      "SELECT creationTime, updateTime, state, payload\n" +
+      "FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_PAYLOAD =
+      "SELECT state, payload\n" +
+      "FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_ALL_TABLES =
+      "SELECT schemaName, name\n" +
+      "FROM %s\n" +
+      "ORDER BY schemaName, name";
+
+  private static final String SELECT_TABLES_IN_SCHEMA =
+      "SELECT name\n" +
+      "FROM %s\n" +
+      "WHERE schemaName = :schemaName\n" +
+      "ORDER BY name";
+
+  private static final String SELECT_TABLE_DETAILS_IN_SCHEMA =
+      "SELECT name, creationTime, updateTime, state, payload\n" +
+      "FROM %s\n" +
+      "WHERE schemaName = :schemaName\n" +
+      "ORDER BY name";
+
+  private static final String DELETE_TABLE =
+      "DELETE FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private final MetastoreManager metastoreManager;
+  private final SQLMetadataConnector connector;
+  private final ObjectMapper jsonMapper;
+  private final IDBI dbi;
+  private final String tableName;
+  private final Deque<Listener> listeners = new ConcurrentLinkedDeque<>();
+
+  @Inject
+  public SQLCatalogManager(MetastoreManager metastoreManager)
+  {
+    if (!metastoreManager.isSql()) {
+      throw new ISE("SQLCatalogManager only works with SQL based metadata 
store at this time");
+    }
+    this.metastoreManager = metastoreManager;
+    this.connector = metastoreManager.sqlConnector();
+    this.dbi = connector.getDBI();
+    this.jsonMapper = metastoreManager.jsonMapper();
+    this.tableName = getTableDefnTable();
+  }
+
+  @Override
+  @LifecycleStart
+  public void start()
+  {
+    createTableDefnTable();
+  }
+
+  @Override
+  public void stop()
+  {
+  }
+
+  // Mimics what MetadataStorageTablesConfig should do.
+  public String getTableDefnTable()
+  {
+    final String base = metastoreManager.tablesConfig().getBase();
+    if (Strings.isNullOrEmpty(base)) {
+      return TABLES_TABLE;
+    } else {
+      return StringUtils.format("%s_%s", base, TABLES_TABLE);
+    }
+  }
+
+  // TODO: Move to SqlMetadataConnector
+  @Override
+  public void createTableDefnTable()
+  {
+    if (!metastoreManager.createTables()) {
+      return;
+    }
+    connector.createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %s (\n"
+                + "  schemaName VARCHAR(255) NOT NULL,\n"
+                + "  name VARCHAR(255) NOT NULL,\n"
+                + "  creationTime BIGINT NOT NULL,\n"
+                + "  updateTime BIGINT NOT NULL,\n"
+                + "  state CHAR(1) NOT NULL,\n"
+                + "  payload %s,\n"
+                + "  PRIMARY KEY(schemaName, name)\n"
+                + ")",
+                tableName,
+                connector.getPayloadType())));
+  }
+
+  @Override
+  public long create(TableMetadata table) throws DuplicateKeyException
+  {
+    try {
+      return dbi.withHandle(
+          new HandleCallback<Long>()
+          {
+            @Override
+            public Long withHandle(Handle handle) throws DuplicateKeyException
+            {
+              long updateTime = System.currentTimeMillis();
+              Update stmt = handle.createStatement(
+                  StringUtils.format(INSERT_TABLE, tableName)
+              )
+                  .bind("schemaName", table.id().schema())
+                  .bind("name", table.id().name())
+                  .bind("creationTime", updateTime)
+                  .bind("updateTime", updateTime)
+                  .bind("state", TableMetadata.TableState.ACTIVE.code())
+                  .bind("payload", table.spec().toBytes(jsonMapper));
+              try {
+                stmt.execute();
+              }
+              catch (UnableToExecuteStatementException e) {
+                if (DbUtils.isDuplicateRecordException(e)) {
+                  throw new DuplicateKeyException(
+                        "Tried to insert a duplicate table: " + 
table.sqlName(),
+                        e);
+                } else {
+                  throw e;
+                }
+              }
+              sendAddition(table, updateTime);
+              return updateTime;
+            }
+          }
+      );
+    }
+    catch (CallbackFailedException e) {
+      if (e.getCause() instanceof DuplicateKeyException) {
+        throw (DuplicateKeyException) e.getCause();
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public TableMetadata read(TableId id)
+  {
+    return dbi.withHandle(
+        new HandleCallback<TableMetadata>()
+        {
+          @Override
+          public TableMetadata withHandle(Handle handle)
+          {
+            Query<Map<String, Object>> query = handle.createQuery(
+                StringUtils.format(SELECT_TABLE, tableName)
+            )
+                .setFetchSize(connector.getStreamingFetchSize())
+                .bind("schemaName", id.schema())
+                .bind("name", id.name());
+            final ResultIterator<TableMetadata> resultIterator =
+                query.map((index, r, ctx) ->
+                  new TableMetadata(
+                      id,
+                      r.getLong(1),
+                      r.getLong(2),
+                      TableMetadata.TableState.fromCode(r.getString(3)),
+                      TableSpec.fromBytes(jsonMapper, r.getBytes(4))
+                  ))
+                .iterator();
+            if (resultIterator.hasNext()) {
+              return resultIterator.next();
+            }
+            return null;
+          }
+        }
+    );
+  }
+
+  @Override
+  public long update(TableMetadata table, long oldVersion) throws 
OutOfDateException, NotFoundException
+  {
+    if (oldVersion == 0) {
+      return updateUnsafe(table.id(), table.spec());
+    } else {
+      return updateSafe(table.id(), table.spec(), oldVersion);
+    }
+  }
+
+  private long updateSafe(TableId id, TableSpec defn, long oldVersion) throws 
OutOfDateException
+  {
+    try {
+      return dbi.withHandle(
+          new HandleCallback<Long>()
+          {
+            @Override
+            public Long withHandle(Handle handle) throws OutOfDateException
+            {
+              long updateTime = System.currentTimeMillis();
+              int updateCount = handle.createStatement(
+                  StringUtils.format(UPDATE_DEFN_SAFE, tableName))
+                  .bind("schemaName", id.schema())
+                  .bind("name", id.name())
+                  .bind("payload", defn.toBytes(jsonMapper))
+                  .bind("updateTime", updateTime)
+                  .bind("oldVersion", oldVersion)
+                  .execute();
+              if (updateCount == 0) {
+                throw new OutOfDateException(
+                    StringUtils.format(
+                        "Table %s: not found or update version does not match 
DB version",
+                        id.sqlName()));
+              }
+              sendUpdate(id);
+              return updateTime;
+            }
+          }
+      );
+    }
+    catch (CallbackFailedException e) {
+      if (e.getCause() instanceof OutOfDateException) {
+        throw (OutOfDateException) e.getCause();
+      }
+      throw e;
+    }
+  }
+
+  private long updateUnsafe(TableId id, TableSpec defn) throws 
NotFoundException

Review Comment:
   Made this more explicit in both the REST API and in the DB API.



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/CatalogManager.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.storage.sql;
+
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Manages catalog data. Used in Coordinator, which will be in either
+ * an leader or standby state. The Coordinator calls the {@link #start()}
+ * method when it becomes the leader.
+ *
+ * Performs detailed CRUD operations on the catalog tables table.
+ * Higher-level operations appear elsewhere.
+ */
+public interface CatalogManager
+{
+  /**
+   * Thrown with an "optimistic lock" fails: the version of a
+   * catalog object being updated is not the same as that of
+   * the expected version.
+   */
+  class OutOfDateException extends Exception
+  {
+    public OutOfDateException(String msg)
+    {
+      super(msg);
+    }
+  }
+
+  /**
+   * Thrown when a record does not exist in the database. Allows
+   * the caller to check for this specific case in a generic way.
+   */
+  class NotFoundException extends Exception
+  {
+    public NotFoundException(String msg)
+    {
+      super(msg);
+    }
+  }
+
+  /**
+   * Indicates an attempt to insert a duplicate key into a table.
+   * This could indicate a logic error, or a race condition. It is
+   * generally not retryable: it us unrealistic to expect the other
+   * thread to helpfully delete the record it just added.
+   */
+  class DuplicateKeyException extends Exception
+  {
+    public DuplicateKeyException(String msg, Exception e)
+    {
+      super(msg, e);
+    }
+  }
+
+  /**
+   * Generic interface for changes to the catalog at the storage level.
+   * Implemented by the catalog sync mechanism to send update events
+   * to the Broker. Note that these events are about the <i>catalog</li>,
+   * not about the physical storage of tables (i.e. datasources.)
+   */
+  interface Listener
+  {
+    /**
+     * A new catalog table entry was added.
+     */
+    void added(TableMetadata table);
+
+    /**
+     * An existing catalog table entry was updated.
+     */
+    void updated(TableMetadata table);
+
+    /**
+     * An existing catalog table entry was deleted.
+     */
+    void deleted(TableId id);
+  }
+
+  /**
+   * Start the catalog manager within a Druid run. Called from lifecycle
+   * management and when a coordinator becomes the leader node.
+   */
+  void start();
+
+  /**
+   * Register a listener for catalog events.
+   */
+  void register(Listener listener);
+
+  /**
+   * Create a table entry.
+   *
+   * @return the version of the newly created table. Call
+   * {@link TableMetadata#asUpdate(long)} if you want a new
+   * {@link TableMetadata} with the new version.
+   * @throws {@link DuplicateKeyException} if the row is a duplicate
+   * (schema, name) pair. This generally indicates a code error,
+   * or since our code is perfect, a race condition or a DB
+   * update outside of Druid. In any event, the error is not
+   * retryable: the user should pick another name, or update the
+   * existing table
+   */
+  long create(TableMetadata table) throws DuplicateKeyException;
+
+  /**
+   * Update a table definition.
+   * <p>
+   * If {@code oldVersion == 0}, overwrites any current content.
+   * This is a potential race conditions if this is a partial update
+   * because of the possibility of another user doing an update since the
+   * read. Fine when the goal is to replace the entire definition.
+   * Else, only does the update is at the given version.
+   * <p>
+   * Retryable only if the version is given, and an
+   * {@code OutOfDateException} is thrown.
+   */
+  long update(TableMetadata table, long oldVersion) throws OutOfDateException, 
NotFoundException;
+
+  /**
+   * Update the table spec incrementally using the transform provided. 
Performs the update
+   * in a transaction to ensure the read and write are atomic.
+   *
+   * @param id        the table to update
+   * @param transform the transform to apply to the table spec
+   * @return          the update timestamp (version) of the updated record
+   */
+  long updatePayload(TableId id, Function<TableSpec, TableSpec> transform) 
throws NotFoundException;
+
+  /**
+   * Move the table to the deleting state. No version check: fine
+   * if the table is already in the deleting state. Does nothing if the
+   * table does not exist.
+   *
+   * @return new table update timestamp, or 0 if the table does not
+   * exist
+   */
+  long markDeleting(TableId id);
+
+  /**
+   * Read the table record for the given ID.
+   *
+   * @return the table record, or {@code null} if the entry is not
+   * found in the DB.
+   */
+  @Nullable TableMetadata read(TableId id);
+
+  /**
+   * Delete the table record for the given ID. Essentially does a
+   * "DELETE IF EXISTS". There is no version check. Delete should be
+   * called only when there are no segments left for the table: use
+   * {@link #markDeleting(TableId)} to indicates that the segments are

Review Comment:
   The catalog doesn't have visibility to trigger segment deletion. Instead, 
the plan is that a DDL statement, `DELETE TABLE foo`, would first use this 
action to mark the table as being deleted, then it would call out to OL to run 
the table deletion task. Then, at the completion of that task, it calls the 
Coordinator to delete the table metadata.
   
   The result is that the table is no longer queryable after `DELETE TABLE` 
(because of the deleting state), but the name is not yet reusable. Once the 
segments are gone, the name becomes available for reuse.
   
   That will all come when we we add DDL statements.



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/sync/CatalogUpdateNotifier.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.sync;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.catalog.http.CatalogListenerResource;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.storage.CatalogStorage;
+import org.apache.druid.catalog.sync.MetadataCatalog.CatalogListener;
+import org.apache.druid.catalog.sync.RestUpdateSender.RestSender;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.annotations.EscalatedClient;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.server.DruidNode;
+import org.joda.time.Duration;
+
+import javax.inject.Inject;
+
+import java.util.Collections;
+import java.util.function.Supplier;
+
+/**
+ * Global update notifier for the catalog. Registers itself as a catalog
+ * listener, then uses the common cache notifier to send Smile-encoded JSON
+ * updates to broker nodes discovered from node discovery (typically 
ZooKeeper.)
+ * <p>
+ * Deletes are encoded as a table update with a table definition of a special
+ * tombstone type. This saves having the need for two endpoints, or having
+ * a wrapper class to handle deletes.
+ */
+@ManageLifecycle
+public class CatalogUpdateNotifier implements CatalogListener
+{
+  private static final String CALLER_NAME = "Catalog Sync";
+  private static final long TIMEOUT_MS = 5000;
+
+  /**
+   * Internal table type used in updates to notify listeners that a table has
+   * been deleted. Avoids the need for a special "table deleted" message.
+   */
+  public static final String TOMBSTONE_TABLE_TYPE = "tombstone";
+  private static final TableSpec TABLE_TOMBSTONE = new 
TableSpec(TOMBSTONE_TABLE_TYPE, null, null);
+
+  private final CacheNotifier notifier;
+  private final ObjectMapper smileMapper;
+
+  @Inject
+  public CatalogUpdateNotifier(
+      CatalogStorage catalog,
+      DruidNodeDiscoveryProvider discoveryProvider,
+      @EscalatedClient HttpClient httpClient,
+      @Smile ObjectMapper smileMapper
+  )
+  {
+    long timeoutMs = TIMEOUT_MS;
+    this.smileMapper = smileMapper;
+    Supplier<Iterable<DruidNode>> nodeSupplier = new ListeningNodeSupplier(
+        Collections.singletonList(NodeRole.BROKER),
+        discoveryProvider);
+    RestSender restSender = RestUpdateSender.httpClientSender(httpClient, 
Duration.millis(timeoutMs));
+    RestUpdateSender sender = new RestUpdateSender(
+        CALLER_NAME,
+        nodeSupplier,
+        restSender,
+        CatalogListenerResource.BASE_URL + CatalogListenerResource.SYNC_URL,
+        timeoutMs);
+    this.notifier = new CacheNotifier(
+        CALLER_NAME,
+        sender);
+    catalog.register(this);

Review Comment:
   How do we handle this issue generically? I'd kind of hoped that the 
Coordinator itself will reject messages if it is the follower.



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/SQLCatalogManager.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.storage.sql;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.storage.MetastoreManager;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.ResultIterator;
+import org.skife.jdbi.v2.Update;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Function;
+
+@ManageLifecycle
+public class SQLCatalogManager implements CatalogManager
+{
+  public static final String TABLES_TABLE = "tableDefs";
+
+  private static final String INSERT_TABLE =
+      "INSERT INTO %s\n" +
+      "  (schemaName, name, creationTime, updateTime, state, payload)\n" +
+      "  VALUES(:schemaName, :name, :creationTime, :updateTime, :state, 
:payload)";
+
+  private static final String UPDATE_HEAD =
+      "UPDATE %s\n SET\n";
+
+  private static final String WHERE_TABLE_ID =
+      "WHERE schemaName = :schemaName\n" +
+      "  AND name = :name\n";
+
+  private static final String SAFETY_CHECK =
+      "  AND updateTime = :oldVersion";
+
+  private static final String UPDATE_DEFN_UNSAFE =
+      UPDATE_HEAD +
+      "  payload = :payload,\n" +
+      "  updateTime = :updateTime\n" +
+      WHERE_TABLE_ID;
+
+  private static final String UPDATE_DEFN_SAFE =
+      UPDATE_DEFN_UNSAFE +
+      SAFETY_CHECK;
+
+  private static final String UPDATE_STATE =
+      UPDATE_HEAD +
+      "  state = :state,\n" +
+      "  updateTime = :updateTime\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_TABLE =
+      "SELECT creationTime, updateTime, state, payload\n" +
+      "FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_PAYLOAD =
+      "SELECT state, payload\n" +
+      "FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_ALL_TABLES =
+      "SELECT schemaName, name\n" +
+      "FROM %s\n" +
+      "ORDER BY schemaName, name";
+
+  private static final String SELECT_TABLES_IN_SCHEMA =
+      "SELECT name\n" +
+      "FROM %s\n" +
+      "WHERE schemaName = :schemaName\n" +
+      "ORDER BY name";
+
+  private static final String SELECT_TABLE_DETAILS_IN_SCHEMA =
+      "SELECT name, creationTime, updateTime, state, payload\n" +
+      "FROM %s\n" +
+      "WHERE schemaName = :schemaName\n" +
+      "ORDER BY name";
+
+  private static final String DELETE_TABLE =
+      "DELETE FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private final MetastoreManager metastoreManager;
+  private final SQLMetadataConnector connector;
+  private final ObjectMapper jsonMapper;
+  private final IDBI dbi;
+  private final String tableName;
+  private final Deque<Listener> listeners = new ConcurrentLinkedDeque<>();
+
+  @Inject
+  public SQLCatalogManager(MetastoreManager metastoreManager)
+  {
+    if (!metastoreManager.isSql()) {
+      throw new ISE("SQLCatalogManager only works with SQL based metadata 
store at this time");
+    }
+    this.metastoreManager = metastoreManager;
+    this.connector = metastoreManager.sqlConnector();
+    this.dbi = connector.getDBI();
+    this.jsonMapper = metastoreManager.jsonMapper();
+    this.tableName = getTableDefnTable();
+  }
+
+  @Override
+  @LifecycleStart
+  public void start()
+  {
+    createTableDefnTable();
+  }
+
+  @Override
+  public void stop()
+  {
+  }
+
+  // Mimics what MetadataStorageTablesConfig should do.
+  public String getTableDefnTable()
+  {
+    final String base = metastoreManager.tablesConfig().getBase();
+    if (Strings.isNullOrEmpty(base)) {
+      return TABLES_TABLE;
+    } else {
+      return StringUtils.format("%s_%s", base, TABLES_TABLE);
+    }
+  }
+
+  // TODO: Move to SqlMetadataConnector
+  @Override
+  public void createTableDefnTable()
+  {
+    if (!metastoreManager.createTables()) {
+      return;
+    }
+    connector.createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %s (\n"
+                + "  schemaName VARCHAR(255) NOT NULL,\n"
+                + "  name VARCHAR(255) NOT NULL,\n"
+                + "  creationTime BIGINT NOT NULL,\n"
+                + "  updateTime BIGINT NOT NULL,\n"
+                + "  state CHAR(1) NOT NULL,\n"
+                + "  payload %s,\n"
+                + "  PRIMARY KEY(schemaName, name)\n"
+                + ")",
+                tableName,
+                connector.getPayloadType())));
+  }
+
+  @Override
+  public long create(TableMetadata table) throws DuplicateKeyException
+  {
+    try {
+      return dbi.withHandle(
+          new HandleCallback<Long>()
+          {
+            @Override
+            public Long withHandle(Handle handle) throws DuplicateKeyException
+            {
+              long updateTime = System.currentTimeMillis();
+              Update stmt = handle.createStatement(
+                  StringUtils.format(INSERT_TABLE, tableName)
+              )
+                  .bind("schemaName", table.id().schema())
+                  .bind("name", table.id().name())
+                  .bind("creationTime", updateTime)
+                  .bind("updateTime", updateTime)
+                  .bind("state", TableMetadata.TableState.ACTIVE.code())
+                  .bind("payload", table.spec().toBytes(jsonMapper));
+              try {
+                stmt.execute();
+              }
+              catch (UnableToExecuteStatementException e) {
+                if (DbUtils.isDuplicateRecordException(e)) {
+                  throw new DuplicateKeyException(
+                        "Tried to insert a duplicate table: " + 
table.sqlName(),
+                        e);
+                } else {
+                  throw e;
+                }
+              }
+              sendAddition(table, updateTime);
+              return updateTime;
+            }
+          }
+      );
+    }
+    catch (CallbackFailedException e) {
+      if (e.getCause() instanceof DuplicateKeyException) {
+        throw (DuplicateKeyException) e.getCause();
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public TableMetadata read(TableId id)
+  {
+    return dbi.withHandle(
+        new HandleCallback<TableMetadata>()
+        {
+          @Override
+          public TableMetadata withHandle(Handle handle)
+          {
+            Query<Map<String, Object>> query = handle.createQuery(
+                StringUtils.format(SELECT_TABLE, tableName)
+            )
+                .setFetchSize(connector.getStreamingFetchSize())
+                .bind("schemaName", id.schema())
+                .bind("name", id.name());
+            final ResultIterator<TableMetadata> resultIterator =
+                query.map((index, r, ctx) ->
+                  new TableMetadata(
+                      id,
+                      r.getLong(1),
+                      r.getLong(2),
+                      TableMetadata.TableState.fromCode(r.getString(3)),
+                      TableSpec.fromBytes(jsonMapper, r.getBytes(4))
+                  ))
+                .iterator();
+            if (resultIterator.hasNext()) {
+              return resultIterator.next();
+            }
+            return null;
+          }
+        }
+    );
+  }
+
+  @Override
+  public long update(TableMetadata table, long oldVersion) throws 
OutOfDateException, NotFoundException
+  {
+    if (oldVersion == 0) {
+      return updateUnsafe(table.id(), table.spec());
+    } else {
+      return updateSafe(table.id(), table.spec(), oldVersion);
+    }
+  }
+
+  private long updateSafe(TableId id, TableSpec defn, long oldVersion) throws 
OutOfDateException
+  {
+    try {
+      return dbi.withHandle(
+          new HandleCallback<Long>()
+          {
+            @Override
+            public Long withHandle(Handle handle) throws OutOfDateException
+            {
+              long updateTime = System.currentTimeMillis();
+              int updateCount = handle.createStatement(
+                  StringUtils.format(UPDATE_DEFN_SAFE, tableName))
+                  .bind("schemaName", id.schema())
+                  .bind("name", id.name())
+                  .bind("payload", defn.toBytes(jsonMapper))
+                  .bind("updateTime", updateTime)
+                  .bind("oldVersion", oldVersion)
+                  .execute();
+              if (updateCount == 0) {
+                throw new OutOfDateException(
+                    StringUtils.format(
+                        "Table %s: not found or update version does not match 
DB version",
+                        id.sqlName()));
+              }
+              sendUpdate(id);
+              return updateTime;
+            }
+          }
+      );
+    }
+    catch (CallbackFailedException e) {
+      if (e.getCause() instanceof OutOfDateException) {
+        throw (OutOfDateException) e.getCause();
+      }
+      throw e;
+    }
+  }
+
+  private long updateUnsafe(TableId id, TableSpec defn) throws 
NotFoundException
+  {
+    try {
+      return dbi.withHandle(
+          new HandleCallback<Long>()
+          {
+            @Override
+            public Long withHandle(Handle handle) throws NotFoundException
+            {
+              long updateTime = System.currentTimeMillis();
+              int updateCount = handle.createStatement(
+                  StringUtils.format(UPDATE_DEFN_UNSAFE, tableName))
+                  .bind("schemaName", id.schema())
+                  .bind("name", id.name())
+                  .bind("payload", defn.toBytes(jsonMapper))
+                  .bind("updateTime", updateTime)
+                  .execute();
+              if (updateCount == 0) {
+                throw new NotFoundException(
+                    StringUtils.format("Table %s: not found", id.sqlName())
+                );
+              }
+              sendUpdate(id);
+              return updateTime;
+            }
+          }
+      );
+    }
+    catch (CallbackFailedException e) {
+      if (e.getCause() instanceof NotFoundException) {
+        throw (NotFoundException) e.getCause();
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public long updatePayload(TableId id, Function<TableSpec, TableSpec> 
transform) throws NotFoundException
+  {
+    try {
+      return dbi.withHandle(
+          new HandleCallback<Long>()
+          {
+            @Override
+            public Long withHandle(Handle handle) throws NotFoundException
+            {
+              handle.begin();
+              try {
+                Query<Map<String, Object>> query = handle.createQuery(
+                    StringUtils.format(SELECT_PAYLOAD, tableName)
+                )
+                    .setFetchSize(connector.getStreamingFetchSize())
+                    .bind("schemaName", id.schema())
+                    .bind("name", id.name());
+
+                final ResultIterator<TableMetadata> resultIterator =
+                    query.map((index, r, ctx) ->
+                      new TableMetadata(
+                          id,
+                          0,
+                          0,
+                          TableMetadata.TableState.fromCode(r.getString(1)),
+                          TableSpec.fromBytes(jsonMapper, r.getBytes(2))
+                      ))
+                    .iterator();
+                TableMetadata table;
+                if (resultIterator.hasNext()) {
+                  table = resultIterator.next();
+                } else {
+                  handle.rollback();
+                  throw new NotFoundException(
+                      StringUtils.format("Table %s: not found", id.sqlName())
+                  );
+                }
+                if (table.state() != TableMetadata.TableState.ACTIVE) {
+                  throw new ISE("Table is in state [%s] and cannot be 
updated", table.state());
+                }
+                TableSpec revised = transform.apply(table.spec());
+                long updateTime = System.currentTimeMillis();
+                int updateCount = handle.createStatement(
+                    StringUtils.format(UPDATE_DEFN_UNSAFE, tableName))
+                    .bind("schemaName", id.schema())
+                    .bind("name", id.name())
+                    .bind("payload", revised.toBytes(jsonMapper))
+                    .bind("updateTime", updateTime)
+                    .execute();
+                if (updateCount == 0) {
+                  // Should never occur because we're holding a lock.
+                  throw new ISE("Table %s: not found", id.sqlName());
+                }
+                handle.commit();
+                sendUpdate(id);
+                return updateTime;
+              }
+              catch (RuntimeException e) {

Review Comment:
   The only checked exception that can occur in this block is 
`NotFoundException`, which is already handled. This block is for any 
"invisible" unchecked exception.



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/SQLCatalogManager.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.storage.sql;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.storage.MetastoreManager;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.ResultIterator;
+import org.skife.jdbi.v2.Update;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Function;
+
+@ManageLifecycle
+public class SQLCatalogManager implements CatalogManager
+{
+  public static final String TABLES_TABLE = "tableDefs";
+
+  private static final String INSERT_TABLE =
+      "INSERT INTO %s\n" +
+      "  (schemaName, name, creationTime, updateTime, state, payload)\n" +
+      "  VALUES(:schemaName, :name, :creationTime, :updateTime, :state, 
:payload)";
+
+  private static final String UPDATE_HEAD =
+      "UPDATE %s\n SET\n";
+
+  private static final String WHERE_TABLE_ID =
+      "WHERE schemaName = :schemaName\n" +
+      "  AND name = :name\n";
+
+  private static final String SAFETY_CHECK =
+      "  AND updateTime = :oldVersion";
+
+  private static final String UPDATE_DEFN_UNSAFE =
+      UPDATE_HEAD +
+      "  payload = :payload,\n" +
+      "  updateTime = :updateTime\n" +
+      WHERE_TABLE_ID;
+
+  private static final String UPDATE_DEFN_SAFE =
+      UPDATE_DEFN_UNSAFE +
+      SAFETY_CHECK;
+
+  private static final String UPDATE_STATE =
+      UPDATE_HEAD +
+      "  state = :state,\n" +
+      "  updateTime = :updateTime\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_TABLE =
+      "SELECT creationTime, updateTime, state, payload\n" +
+      "FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_PAYLOAD =
+      "SELECT state, payload\n" +
+      "FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_ALL_TABLES =
+      "SELECT schemaName, name\n" +
+      "FROM %s\n" +
+      "ORDER BY schemaName, name";
+
+  private static final String SELECT_TABLES_IN_SCHEMA =
+      "SELECT name\n" +
+      "FROM %s\n" +
+      "WHERE schemaName = :schemaName\n" +
+      "ORDER BY name";
+
+  private static final String SELECT_TABLE_DETAILS_IN_SCHEMA =
+      "SELECT name, creationTime, updateTime, state, payload\n" +
+      "FROM %s\n" +
+      "WHERE schemaName = :schemaName\n" +
+      "ORDER BY name";
+
+  private static final String DELETE_TABLE =
+      "DELETE FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private final MetastoreManager metastoreManager;
+  private final SQLMetadataConnector connector;
+  private final ObjectMapper jsonMapper;
+  private final IDBI dbi;
+  private final String tableName;
+  private final Deque<Listener> listeners = new ConcurrentLinkedDeque<>();
+
+  @Inject
+  public SQLCatalogManager(MetastoreManager metastoreManager)
+  {
+    if (!metastoreManager.isSql()) {
+      throw new ISE("SQLCatalogManager only works with SQL based metadata 
store at this time");
+    }
+    this.metastoreManager = metastoreManager;
+    this.connector = metastoreManager.sqlConnector();
+    this.dbi = connector.getDBI();
+    this.jsonMapper = metastoreManager.jsonMapper();
+    this.tableName = getTableDefnTable();
+  }
+
+  @Override
+  @LifecycleStart
+  public void start()
+  {
+    createTableDefnTable();
+  }
+
+  @Override
+  public void stop()
+  {
+  }
+
+  // Mimics what MetadataStorageTablesConfig should do.
+  public String getTableDefnTable()
+  {
+    final String base = metastoreManager.tablesConfig().getBase();
+    if (Strings.isNullOrEmpty(base)) {
+      return TABLES_TABLE;
+    } else {
+      return StringUtils.format("%s_%s", base, TABLES_TABLE);
+    }
+  }
+
+  // TODO: Move to SqlMetadataConnector
+  @Override
+  public void createTableDefnTable()
+  {
+    if (!metastoreManager.createTables()) {
+      return;
+    }
+    connector.createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %s (\n"

Review Comment:
   Went ahead and extracted the statement to a constant. Not much I can do with 
DBI, that's what Druid uses.



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/SQLCatalogManager.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.storage.sql;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.storage.MetastoreManager;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.ResultIterator;
+import org.skife.jdbi.v2.Update;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Function;
+
+@ManageLifecycle
+public class SQLCatalogManager implements CatalogManager
+{
+  public static final String TABLES_TABLE = "tableDefs";
+
+  private static final String INSERT_TABLE =
+      "INSERT INTO %s\n" +
+      "  (schemaName, name, creationTime, updateTime, state, payload)\n" +
+      "  VALUES(:schemaName, :name, :creationTime, :updateTime, :state, 
:payload)";
+
+  private static final String UPDATE_HEAD =
+      "UPDATE %s\n SET\n";
+
+  private static final String WHERE_TABLE_ID =
+      "WHERE schemaName = :schemaName\n" +
+      "  AND name = :name\n";
+
+  private static final String SAFETY_CHECK =
+      "  AND updateTime = :oldVersion";
+
+  private static final String UPDATE_DEFN_UNSAFE =
+      UPDATE_HEAD +
+      "  payload = :payload,\n" +
+      "  updateTime = :updateTime\n" +
+      WHERE_TABLE_ID;
+
+  private static final String UPDATE_DEFN_SAFE =
+      UPDATE_DEFN_UNSAFE +
+      SAFETY_CHECK;

Review Comment:
   Went ahead and converted (almost) all statements to the fully expanded form.



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/CatalogManager.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.storage.sql;
+
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Manages catalog data. Used in Coordinator, which will be in either
+ * an leader or standby state. The Coordinator calls the {@link #start()}
+ * method when it becomes the leader, and calls {@link #stop()} when
+ * it loses leadership, or shuts down.
+ *
+ * Performs detailed CRUD operations on the catalog tables table.
+ * Higher-level operations appear elsewhere.
+ */
+public interface CatalogManager
+{
+  /**
+   * Thrown with an "optimistic lock" fails: the version of a
+   * catalog object being updated is not the same as that of
+   * the expected version.
+   */
+  class OutOfDateException extends Exception
+  {
+    public OutOfDateException(String msg)
+    {
+      super(msg);
+    }
+  }
+
+  class NotFoundException extends Exception
+  {
+    public NotFoundException(String msg)
+    {
+      super(msg);
+    }
+  }
+
+  /**
+   * Indicates an attempt to insert a duplicate key into a table.
+   * This could indicate a logic error, or a race condition. It is
+   * generally not retryable: it us unrealistic to expect the other
+   * thread to helpfully delete the record it just added.
+   */
+  class DuplicateKeyException extends Exception
+  {
+    public DuplicateKeyException(String msg, Exception e)
+    {
+      super(msg, e);
+    }
+  }
+
+  interface Listener
+  {
+    void added(TableMetadata table);
+    void updated(TableMetadata table);
+    void deleted(TableId id);
+  }
+
+  void start();
+
+
+  void register(Listener listener);
+  void createTableDefnTable();
+
+  /**
+   * Create a table entry.
+   *
+   * @return the version of the newly created table. Call
+   * {@link TableMetadata#asUpdate(long)} if you want a new
+   * {@link TableMetadata} with the new version.
+   * @throws {@link DuplicateKeyException} if the row is a duplicate
+   * (schema, name) pair. This generally indicates a code error,
+   * or since our code is perfect, a race condition or a DB
+   * update outside of Druid. In any event, the error is not
+   * retryable: the user should pick another name, or update the
+   * existing table
+   */
+  long create(TableMetadata table) throws DuplicateKeyException;
+
+  /**
+   * Update a table definition.
+   * <p>
+   * If {@code oldVersion == 0}, overwrites any current content.
+   * This is a potential race conditions if this is a partial update
+   * because of the possibility of another user doing an update since the
+   * read. Fine when the goal is to replace the entire definition.
+   * Else, only does the update is at the given version.
+   * <p>
+   * Retryable only if the version is given, and an
+   * {@code OutOfDateException} is thrown.
+   */
+  long update(TableMetadata table, long oldVersion) throws OutOfDateException, 
NotFoundException;

Review Comment:
   The revised version clearly separates the two use cases at both the REST API 
and DB API levels.



##########
extensions-core/druid-catalog/src/main/java/org/apache/druid/catalog/storage/sql/SQLCatalogManager.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.storage.sql;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.apache.druid.catalog.model.TableId;
+import org.apache.druid.catalog.model.TableMetadata;
+import org.apache.druid.catalog.model.TableSpec;
+import org.apache.druid.catalog.storage.MetastoreManager;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.IDBI;
+import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.ResultIterator;
+import org.skife.jdbi.v2.Update;
+import org.skife.jdbi.v2.exceptions.CallbackFailedException;
+import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Function;
+
+@ManageLifecycle
+public class SQLCatalogManager implements CatalogManager
+{
+  public static final String TABLES_TABLE = "tableDefs";
+
+  private static final String INSERT_TABLE =
+      "INSERT INTO %s\n" +
+      "  (schemaName, name, creationTime, updateTime, state, payload)\n" +
+      "  VALUES(:schemaName, :name, :creationTime, :updateTime, :state, 
:payload)";
+
+  private static final String UPDATE_HEAD =
+      "UPDATE %s\n SET\n";
+
+  private static final String WHERE_TABLE_ID =
+      "WHERE schemaName = :schemaName\n" +
+      "  AND name = :name\n";
+
+  private static final String SAFETY_CHECK =
+      "  AND updateTime = :oldVersion";
+
+  private static final String UPDATE_DEFN_UNSAFE =
+      UPDATE_HEAD +
+      "  payload = :payload,\n" +
+      "  updateTime = :updateTime\n" +
+      WHERE_TABLE_ID;
+
+  private static final String UPDATE_DEFN_SAFE =
+      UPDATE_DEFN_UNSAFE +
+      SAFETY_CHECK;
+
+  private static final String UPDATE_STATE =
+      UPDATE_HEAD +
+      "  state = :state,\n" +
+      "  updateTime = :updateTime\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_TABLE =
+      "SELECT creationTime, updateTime, state, payload\n" +
+      "FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_PAYLOAD =
+      "SELECT state, payload\n" +
+      "FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private static final String SELECT_ALL_TABLES =
+      "SELECT schemaName, name\n" +
+      "FROM %s\n" +
+      "ORDER BY schemaName, name";
+
+  private static final String SELECT_TABLES_IN_SCHEMA =
+      "SELECT name\n" +
+      "FROM %s\n" +
+      "WHERE schemaName = :schemaName\n" +
+      "ORDER BY name";
+
+  private static final String SELECT_TABLE_DETAILS_IN_SCHEMA =
+      "SELECT name, creationTime, updateTime, state, payload\n" +
+      "FROM %s\n" +
+      "WHERE schemaName = :schemaName\n" +
+      "ORDER BY name";
+
+  private static final String DELETE_TABLE =
+      "DELETE FROM %s\n" +
+      WHERE_TABLE_ID;
+
+  private final MetastoreManager metastoreManager;
+  private final SQLMetadataConnector connector;
+  private final ObjectMapper jsonMapper;
+  private final IDBI dbi;
+  private final String tableName;
+  private final Deque<Listener> listeners = new ConcurrentLinkedDeque<>();
+
+  @Inject
+  public SQLCatalogManager(MetastoreManager metastoreManager)
+  {
+    if (!metastoreManager.isSql()) {
+      throw new ISE("SQLCatalogManager only works with SQL based metadata 
store at this time");
+    }
+    this.metastoreManager = metastoreManager;
+    this.connector = metastoreManager.sqlConnector();
+    this.dbi = connector.getDBI();
+    this.jsonMapper = metastoreManager.jsonMapper();
+    this.tableName = getTableDefnTable();
+  }
+
+  @Override
+  @LifecycleStart
+  public void start()
+  {
+    createTableDefnTable();
+  }
+
+  @Override
+  public void stop()
+  {
+  }
+
+  // Mimics what MetadataStorageTablesConfig should do.
+  public String getTableDefnTable()
+  {
+    final String base = metastoreManager.tablesConfig().getBase();
+    if (Strings.isNullOrEmpty(base)) {
+      return TABLES_TABLE;
+    } else {
+      return StringUtils.format("%s_%s", base, TABLES_TABLE);
+    }
+  }
+
+  // TODO: Move to SqlMetadataConnector
+  @Override
+  public void createTableDefnTable()
+  {
+    if (!metastoreManager.createTables()) {
+      return;
+    }
+    connector.createTable(
+        tableName,
+        ImmutableList.of(
+            StringUtils.format(
+                "CREATE TABLE %s (\n"
+                + "  schemaName VARCHAR(255) NOT NULL,\n"
+                + "  name VARCHAR(255) NOT NULL,\n"
+                + "  creationTime BIGINT NOT NULL,\n"
+                + "  updateTime BIGINT NOT NULL,\n"
+                + "  state CHAR(1) NOT NULL,\n"
+                + "  payload %s,\n"
+                + "  PRIMARY KEY(schemaName, name)\n"
+                + ")",
+                tableName,
+                connector.getPayloadType())));
+  }
+
+  @Override
+  public long create(TableMetadata table) throws DuplicateKeyException
+  {
+    try {
+      return dbi.withHandle(
+          new HandleCallback<Long>()
+          {
+            @Override
+            public Long withHandle(Handle handle) throws DuplicateKeyException
+            {
+              long updateTime = System.currentTimeMillis();
+              Update stmt = handle.createStatement(
+                  StringUtils.format(INSERT_TABLE, tableName)
+              )
+                  .bind("schemaName", table.id().schema())
+                  .bind("name", table.id().name())
+                  .bind("creationTime", updateTime)
+                  .bind("updateTime", updateTime)
+                  .bind("state", TableMetadata.TableState.ACTIVE.code())
+                  .bind("payload", table.spec().toBytes(jsonMapper));
+              try {
+                stmt.execute();
+              }
+              catch (UnableToExecuteStatementException e) {
+                if (DbUtils.isDuplicateRecordException(e)) {
+                  throw new DuplicateKeyException(
+                        "Tried to insert a duplicate table: " + 
table.sqlName(),
+                        e);
+                } else {
+                  throw e;
+                }
+              }
+              sendAddition(table, updateTime);
+              return updateTime;
+            }
+          }
+      );
+    }
+    catch (CallbackFailedException e) {
+      if (e.getCause() instanceof DuplicateKeyException) {
+        throw (DuplicateKeyException) e.getCause();
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public TableMetadata read(TableId id)
+  {
+    return dbi.withHandle(
+        new HandleCallback<TableMetadata>()
+        {
+          @Override
+          public TableMetadata withHandle(Handle handle)
+          {
+            Query<Map<String, Object>> query = handle.createQuery(
+                StringUtils.format(SELECT_TABLE, tableName)
+            )
+                .setFetchSize(connector.getStreamingFetchSize())
+                .bind("schemaName", id.schema())
+                .bind("name", id.name());
+            final ResultIterator<TableMetadata> resultIterator =
+                query.map((index, r, ctx) ->
+                  new TableMetadata(
+                      id,
+                      r.getLong(1),
+                      r.getLong(2),
+                      TableMetadata.TableState.fromCode(r.getString(3)),
+                      TableSpec.fromBytes(jsonMapper, r.getBytes(4))
+                  ))
+                .iterator();
+            if (resultIterator.hasNext()) {
+              return resultIterator.next();
+            }
+            return null;
+          }
+        }
+    );
+  }
+
+  @Override
+  public long update(TableMetadata table, long oldVersion) throws 
OutOfDateException, NotFoundException
+  {
+    if (oldVersion == 0) {
+      return updateUnsafe(table.id(), table.spec());
+    } else {
+      return updateSafe(table.id(), table.spec(), oldVersion);
+    }
+  }
+
+  private long updateSafe(TableId id, TableSpec defn, long oldVersion) throws 
OutOfDateException
+  {
+    try {
+      return dbi.withHandle(
+          new HandleCallback<Long>()
+          {
+            @Override
+            public Long withHandle(Handle handle) throws OutOfDateException
+            {
+              long updateTime = System.currentTimeMillis();
+              int updateCount = handle.createStatement(
+                  StringUtils.format(UPDATE_DEFN_SAFE, tableName))
+                  .bind("schemaName", id.schema())
+                  .bind("name", id.name())
+                  .bind("payload", defn.toBytes(jsonMapper))
+                  .bind("updateTime", updateTime)
+                  .bind("oldVersion", oldVersion)
+                  .execute();
+              if (updateCount == 0) {
+                throw new OutOfDateException(

Review Comment:
   To do a coherent check, we'd have to do the update then check in a 
transaction, which seems overkill for an obscure state. Instead, I've 
generalized the error message. It is the _client_, not the server, which has to 
do a read: to see if the table still exists, to get the latest version, and to 
see if the table is still active. With that, the client can redo its attempted 
update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to