kfaraz commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1568275079


##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java:
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.timeline.SegmentId;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.TransactionCallback;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Handles segment schema persistence and cleanup.
+ */
+@LazySingleton
+public class SegmentSchemaManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentSchemaManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+  private final FingerprintGenerator fingerprintGenerator;
+
+  @Inject
+  public SegmentSchemaManager(
+      MetadataStorageTablesConfig dbTables,
+      ObjectMapper jsonMapper,
+      SQLMetadataConnector connector,
+      FingerprintGenerator fingerprintGenerator
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.connector = connector;
+    this.fingerprintGenerator = fingerprintGenerator;
+  }
+
+  public List<Long> identifyReferencedUnusedSchema()

Review Comment:
   ```suggestion
     public List<Long> findReferencedSchemaIdsMarkedAsUnused()
   ```



##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java:
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.timeline.SegmentId;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.TransactionCallback;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Handles segment schema persistence and cleanup.
+ */
+@LazySingleton
+public class SegmentSchemaManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentSchemaManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+  private final FingerprintGenerator fingerprintGenerator;
+
+  @Inject
+  public SegmentSchemaManager(
+      MetadataStorageTablesConfig dbTables,
+      ObjectMapper jsonMapper,
+      SQLMetadataConnector connector,
+      FingerprintGenerator fingerprintGenerator
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.connector = connector;
+    this.fingerprintGenerator = fingerprintGenerator;
+  }
+
+  public List<Long> identifyReferencedUnusedSchema()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createQuery(
+                      StringUtils.format(
+                          "SELECT DISTINCT(schema_id) FROM %s WHERE used = 
true AND schema_id IN (SELECT id FROM %s WHERE used = false)",

Review Comment:
   Why only look at segments with `used = true`? Even if a schema is being 
referenced by an unused segment, the former should still be marked as `used = 
true`.



##########
services/src/main/java/org/apache/druid/cli/CliHistorical.java:
##########
@@ -101,6 +101,8 @@ protected List<? extends Module> getModules()
         new JoinableFactoryModule(),
         new HistoricalServiceModule(),
         binder -> {
+          
CliCoordinator.validateCentralizedDatasourceSchemaConfig(getProperties());

Review Comment:
   No, only the services which actually use that config should even be aware of 
it.



##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java:
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.timeline.SegmentId;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.TransactionCallback;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Handles segment schema persistence and cleanup.
+ */
+@LazySingleton
+public class SegmentSchemaManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentSchemaManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+  private final FingerprintGenerator fingerprintGenerator;
+
+  @Inject
+  public SegmentSchemaManager(
+      MetadataStorageTablesConfig dbTables,
+      ObjectMapper jsonMapper,
+      SQLMetadataConnector connector,
+      FingerprintGenerator fingerprintGenerator
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.connector = connector;
+    this.fingerprintGenerator = fingerprintGenerator;
+  }
+
+  public List<Long> identifyReferencedUnusedSchema()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createQuery(
+                      StringUtils.format(
+                          "SELECT DISTINCT(schema_id) FROM %s WHERE used = 
true AND schema_id IN (SELECT id FROM %s WHERE used = false)",
+                          dbTables.getSegmentsTable(),
+                          dbTables.getSegmentSchemasTable()
+                      ))
+                  .mapTo(Long.class)
+                  .list()
+    );
+  }
+
+  public int markSchemaUsed(List<Long> schemaIds)
+  {
+    String inClause = 
schemaIds.stream().map(Object::toString).collect(Collectors.joining(","));
+
+    return connector.retryWithHandle(
+        handle ->
+            handle.createStatement(
+                      StringUtils.format(
+                          "UPDATE %s SET used = true, used_status_last_updated 
= :now"
+                          + " WHERE id IN (%s)",
+                          dbTables.getSegmentSchemasTable(), inClause
+                      )
+                  )
+                  .bind("now", DateTimes.nowUtc().toString())
+                  .execute()
+    );
+  }
+
+  public int deleteSchemasOlderThan(long timestamp)
+  {
+    return connector.retryWithHandle(
+        handle -> handle.createStatement(
+                            StringUtils.format(
+                                "DELETE FROM %s WHERE used = false AND 
used_status_last_updated < :now",
+                                dbTables.getSegmentSchemasTable()
+                            ))
+                        .bind("now", DateTimes.utc(timestamp).toString())
+                        .execute());
+  }
+
+  public int identifyAndMarkSchemaUnused()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createStatement(
+                      StringUtils.format(
+                          "UPDATE %s SET used = false, 
used_status_last_updated = :now  WHERE used != false "
+                          + "AND id NOT IN (SELECT DISTINCT(schema_id) FROM %s 
WHERE used=true AND schema_id IS NOT NULL)",
+                          dbTables.getSegmentSchemasTable(),
+                          dbTables.getSegmentsTable()
+                      )
+                  )
+                  .bind("now", DateTimes.nowUtc().toString())
+                  .execute());
+  }
+
+  public String generateSchemaPayloadFingerprint(SchemaPayload payload)
+  {
+    return fingerprintGenerator.generateFingerprint(payload);
+  }
+
+  /**
+   * Persist segment schema and update segments in a transaction.
+   */
+  public void persistSchemaAndUpdateSegmentsTable(String dataSource, 
List<SegmentSchemaMetadataPlus> segmentSchemas, int schemaVersion)
+  {
+    connector.retryTransaction((TransactionCallback<Void>) (handle, status) -> 
{
+      Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>();
+
+      for (SegmentSchemaMetadataPlus segmentSchema : segmentSchemas) {
+        schemaPayloadMap.put(
+            segmentSchema.getFingerprint(),
+            segmentSchema.getSegmentSchemaMetadata().getSchemaPayload()
+        );
+      }
+      persistSegmentSchema(handle, dataSource, schemaPayloadMap, 
schemaVersion);
+      updateSegmentWithSchemaInformation(handle, dataSource, schemaVersion, 
segmentSchemas);
+
+      return null;
+    }, 1, 3);
+  }
+
+  /**
+   * Persist unique segment schema in the DB.
+   */
+  public void persistSegmentSchema(
+      Handle handle,
+      String dataSource,
+      Map<String, SchemaPayload> fingerprintSchemaPayloadMap,
+      int schemaVersion
+  ) throws JsonProcessingException
+  {
+    try {
+      // Filter already existing schema
+      Map<Boolean, Set<String>> existingFingerprintsAndUsedStatus = 
fingerprintExistBatch(handle, dataSource, schemaVersion, 
fingerprintSchemaPayloadMap.keySet());
+      Set<String> usedExistingFingerprints = 
existingFingerprintsAndUsedStatus.containsKey(true) ? 
existingFingerprintsAndUsedStatus.get(true) : new HashSet<>();
+      Set<String> unusedExistingFingerprints = 
existingFingerprintsAndUsedStatus.containsKey(false) ? 
existingFingerprintsAndUsedStatus.get(false) : new HashSet<>();
+      Set<String> existingFingerprints = Sets.union(usedExistingFingerprints, 
unusedExistingFingerprints);
+      if (existingFingerprints.size() > 0) {
+        log.info(
+            "Found already existing schema in the DB for dataSource [%1$s]. 
Used fingeprints: [%2$s], Unused fingerprints: [%3$s]",
+            dataSource,
+            usedExistingFingerprints,
+            unusedExistingFingerprints
+        );
+      }
+
+      // There is a possibility of race with schema cleanup Coordinator duty.
+      // The duty could delete the unused schema. We try to mark them used.
+      // However, if the duty succeeds in deleting it the transaction fails 
due to consistency guarantees.
+      // The failed transaction is retried.
+      // Since the deletion period would be at least > 1h, we are sure that 
the race wouldn't arise on retry.
+      // There is another race, wherein used schema could be marked as unused 
by the cleanup duty.
+      // The implication is that a segment could reference an unused schema. 
Since there is a significant gap
+      // between marking the segment as unused and deletion, the schema won't 
be lost during retry.
+      // There is no functional problem as such, since the duty would itself 
mark those schema as used.

Review Comment:
   Rather than this long comment, just direct a reader to the javadocs of 
`KillUnreferencedSegmentSchemaDuty` and make sure to capture all the details 
there.



##########
processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchemas.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Encapsulates segment metadata and corresponding schema.
+ */
+public class DataSegmentWithSchemas

Review Comment:
   ```suggestion
   public class DataSegmentsWithSchemas
   ```



##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java:
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.timeline.SegmentId;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.TransactionCallback;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Handles segment schema persistence and cleanup.
+ */
+@LazySingleton
+public class SegmentSchemaManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentSchemaManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+  private final FingerprintGenerator fingerprintGenerator;
+
+  @Inject
+  public SegmentSchemaManager(
+      MetadataStorageTablesConfig dbTables,
+      ObjectMapper jsonMapper,
+      SQLMetadataConnector connector,
+      FingerprintGenerator fingerprintGenerator
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.connector = connector;
+    this.fingerprintGenerator = fingerprintGenerator;
+  }
+
+  public List<Long> identifyReferencedUnusedSchema()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createQuery(
+                      StringUtils.format(
+                          "SELECT DISTINCT(schema_id) FROM %s WHERE used = 
true AND schema_id IN (SELECT id FROM %s WHERE used = false)",
+                          dbTables.getSegmentsTable(),
+                          dbTables.getSegmentSchemasTable()
+                      ))
+                  .mapTo(Long.class)
+                  .list()
+    );
+  }
+
+  public int markSchemaUsed(List<Long> schemaIds)

Review Comment:
   ```suggestion
     public int markSchemaIdsAsUsed(List<Long> schemaIds)
   ```



##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java:
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.timeline.SegmentId;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.TransactionCallback;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Handles segment schema persistence and cleanup.
+ */
+@LazySingleton
+public class SegmentSchemaManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentSchemaManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+  private final FingerprintGenerator fingerprintGenerator;
+
+  @Inject
+  public SegmentSchemaManager(
+      MetadataStorageTablesConfig dbTables,
+      ObjectMapper jsonMapper,
+      SQLMetadataConnector connector,
+      FingerprintGenerator fingerprintGenerator
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.connector = connector;
+    this.fingerprintGenerator = fingerprintGenerator;
+  }
+
+  public List<Long> identifyReferencedUnusedSchema()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createQuery(
+                      StringUtils.format(
+                          "SELECT DISTINCT(schema_id) FROM %s WHERE used = 
true AND schema_id IN (SELECT id FROM %s WHERE used = false)",
+                          dbTables.getSegmentsTable(),
+                          dbTables.getSegmentSchemasTable()
+                      ))
+                  .mapTo(Long.class)
+                  .list()
+    );
+  }
+
+  public int markSchemaUsed(List<Long> schemaIds)
+  {
+    String inClause = 
schemaIds.stream().map(Object::toString).collect(Collectors.joining(","));

Review Comment:
   Immediately return 0 if `schemaIds` is null or empty.



##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java:
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.timeline.SegmentId;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.TransactionCallback;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Handles segment schema persistence and cleanup.
+ */
+@LazySingleton
+public class SegmentSchemaManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentSchemaManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+  private final FingerprintGenerator fingerprintGenerator;
+
+  @Inject
+  public SegmentSchemaManager(
+      MetadataStorageTablesConfig dbTables,
+      ObjectMapper jsonMapper,
+      SQLMetadataConnector connector,
+      FingerprintGenerator fingerprintGenerator
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.connector = connector;
+    this.fingerprintGenerator = fingerprintGenerator;
+  }
+
+  public List<Long> identifyReferencedUnusedSchema()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createQuery(
+                      StringUtils.format(
+                          "SELECT DISTINCT(schema_id) FROM %s WHERE used = 
true AND schema_id IN (SELECT id FROM %s WHERE used = false)",
+                          dbTables.getSegmentsTable(),
+                          dbTables.getSegmentSchemasTable()
+                      ))
+                  .mapTo(Long.class)
+                  .list()
+    );
+  }
+
+  public int markSchemaUsed(List<Long> schemaIds)
+  {
+    String inClause = 
schemaIds.stream().map(Object::toString).collect(Collectors.joining(","));
+
+    return connector.retryWithHandle(
+        handle ->
+            handle.createStatement(
+                      StringUtils.format(
+                          "UPDATE %s SET used = true, used_status_last_updated 
= :now"
+                          + " WHERE id IN (%s)",
+                          dbTables.getSegmentSchemasTable(), inClause
+                      )
+                  )
+                  .bind("now", DateTimes.nowUtc().toString())
+                  .execute()
+    );
+  }
+
+  public int deleteSchemasOlderThan(long timestamp)
+  {
+    return connector.retryWithHandle(
+        handle -> handle.createStatement(
+                            StringUtils.format(
+                                "DELETE FROM %s WHERE used = false AND 
used_status_last_updated < :now",
+                                dbTables.getSegmentSchemasTable()
+                            ))
+                        .bind("now", DateTimes.utc(timestamp).toString())
+                        .execute());
+  }
+
+  public int identifyAndMarkSchemaUnused()

Review Comment:
   ```suggestion
     public int markUnreferencedSchemasAsUnused()
   ```



##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java:
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.timeline.SegmentId;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.TransactionCallback;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Handles segment schema persistence and cleanup.
+ */
+@LazySingleton
+public class SegmentSchemaManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentSchemaManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+  private final FingerprintGenerator fingerprintGenerator;
+
+  @Inject
+  public SegmentSchemaManager(
+      MetadataStorageTablesConfig dbTables,
+      ObjectMapper jsonMapper,
+      SQLMetadataConnector connector,
+      FingerprintGenerator fingerprintGenerator
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.connector = connector;
+    this.fingerprintGenerator = fingerprintGenerator;
+  }
+
+  public List<Long> identifyReferencedUnusedSchema()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createQuery(
+                      StringUtils.format(
+                          "SELECT DISTINCT(schema_id) FROM %s WHERE used = 
true AND schema_id IN (SELECT id FROM %s WHERE used = false)",
+                          dbTables.getSegmentsTable(),
+                          dbTables.getSegmentSchemasTable()
+                      ))
+                  .mapTo(Long.class)
+                  .list()
+    );
+  }
+
+  public int markSchemaUsed(List<Long> schemaIds)
+  {
+    String inClause = 
schemaIds.stream().map(Object::toString).collect(Collectors.joining(","));
+
+    return connector.retryWithHandle(
+        handle ->
+            handle.createStatement(
+                      StringUtils.format(
+                          "UPDATE %s SET used = true, used_status_last_updated 
= :now"
+                          + " WHERE id IN (%s)",
+                          dbTables.getSegmentSchemasTable(), inClause
+                      )
+                  )
+                  .bind("now", DateTimes.nowUtc().toString())
+                  .execute()
+    );
+  }
+
+  public int deleteSchemasOlderThan(long timestamp)
+  {
+    return connector.retryWithHandle(
+        handle -> handle.createStatement(
+                            StringUtils.format(
+                                "DELETE FROM %s WHERE used = false AND 
used_status_last_updated < :now",
+                                dbTables.getSegmentSchemasTable()
+                            ))
+                        .bind("now", DateTimes.utc(timestamp).toString())
+                        .execute());
+  }
+
+  public int identifyAndMarkSchemaUnused()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createStatement(
+                      StringUtils.format(
+                          "UPDATE %s SET used = false, 
used_status_last_updated = :now  WHERE used != false "
+                          + "AND id NOT IN (SELECT DISTINCT(schema_id) FROM %s 
WHERE used=true AND schema_id IS NOT NULL)",
+                          dbTables.getSegmentSchemasTable(),
+                          dbTables.getSegmentsTable()
+                      )
+                  )
+                  .bind("now", DateTimes.nowUtc().toString())
+                  .execute());
+  }
+
+  public String generateSchemaPayloadFingerprint(SchemaPayload payload)

Review Comment:
   Why do this through the `SegmentSchemaManager`? This method is being used 
only in the backfill queue.



##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java:
##########
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.timeline.SegmentId;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.TransactionCallback;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Handles segment schema persistence and cleanup.
+ */
+@LazySingleton
+public class SegmentSchemaManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentSchemaManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+  private final FingerprintGenerator fingerprintGenerator;
+
+  @Inject
+  public SegmentSchemaManager(
+      MetadataStorageTablesConfig dbTables,
+      ObjectMapper jsonMapper,
+      SQLMetadataConnector connector,
+      FingerprintGenerator fingerprintGenerator
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.connector = connector;
+    this.fingerprintGenerator = fingerprintGenerator;
+  }
+
+  public List<Long> identifyReferencedUnusedSchema()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createQuery(
+                      StringUtils.format(
+                          "SELECT DISTINCT(schema_id) FROM %s WHERE used = 
true AND schema_id IN (SELECT id FROM %s WHERE used = false)",
+                          dbTables.getSegmentsTable(),
+                          dbTables.getSegmentSchemasTable()
+                      ))
+                  .mapTo(Long.class)
+                  .list()
+    );
+  }
+
+  public int markSchemaUsed(List<Long> schemaIds)
+  {
+    String inClause = 
schemaIds.stream().map(Object::toString).collect(Collectors.joining(","));

Review Comment:
   It seems that joined ids will not be quoted in single-quotes. Please make 
sure you test out these methods in a local cluster.



##########
server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.timeline.SegmentId;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.TransactionCallback;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Handles segment schema persistence and cleanup.
+ */
+@LazySingleton
+public class SegmentSchemaManager
+{
+  private static final EmittingLogger log = new 
EmittingLogger(SegmentSchemaManager.class);
+  private static final int DB_ACTION_PARTITION_SIZE = 100;
+  private final MetadataStorageTablesConfig dbTables;
+  private final ObjectMapper jsonMapper;
+  private final SQLMetadataConnector connector;
+  private final FingerprintGenerator fingerprintGenerator;
+
+  @Inject
+  public SegmentSchemaManager(
+      MetadataStorageTablesConfig dbTables,
+      ObjectMapper jsonMapper,
+      SQLMetadataConnector connector,
+      FingerprintGenerator fingerprintGenerator
+  )
+  {
+    this.dbTables = dbTables;
+    this.jsonMapper = jsonMapper;
+    this.connector = connector;
+    this.fingerprintGenerator = fingerprintGenerator;
+  }
+
+  public List<Long> identifyReferencedUnusedSchema()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createQuery(
+                StringUtils.format(
+                    "SELECT DISTINCT(schema_id) FROM %s WHERE used = true AND 
schema_id IN (SELECT id FROM %s WHERE used = false)",
+                    dbTables.getSegmentsTable(), 
dbTables.getSegmentSchemasTable()
+                ))
+                  .mapTo(Long.class)
+                  .list()
+    );
+  }
+
+  public int markSchemaUsed(List<Long> schemaIds)
+  {
+    String inClause = 
schemaIds.stream().map(Object::toString).collect(Collectors.joining(","));
+
+    return connector.retryWithHandle(
+        handle ->
+            handle.createStatement(
+                      StringUtils.format(
+                          "UPDATE %s SET used = true, used_status_last_updated 
= :created_date"
+                          + " WHERE id IN (%s)",
+                          dbTables.getSegmentSchemasTable(), inClause
+                      )
+                  )
+                  .bind("created_date", DateTimes.nowUtc().toString())
+                  .execute()
+    );
+  }
+
+  public int deleteSchemasOlderThan(long timestamp)
+  {
+    String dateTime = DateTimes.utc(timestamp).toString();
+    return connector.retryWithHandle(
+        handle -> handle.createStatement(
+                            StringUtils.format(
+                                "DELETE FROM %s WHERE used = false AND 
used_status_last_updated < :date_time",
+                                dbTables.getSegmentSchemasTable()
+                            ))
+                        .bind("date_time", dateTime)
+                        .execute());
+  }
+
+  public int identifyAndMarkSchemaUnused()
+  {
+    return connector.retryWithHandle(
+        handle ->
+            handle.createStatement(
+                      StringUtils.format(
+                          "UPDATE %s SET used = false, 
used_status_last_updated = :created_date  WHERE used != false "
+                          + "AND id NOT IN (SELECT DISTINCT(schema_id) FROM %s 
WHERE used=true AND schema_id IS NOT NULL)",
+                          dbTables.getSegmentSchemasTable(),
+                          dbTables.getSegmentsTable()
+                      )
+                  )
+                  .bind("created_date", DateTimes.nowUtc().toString())
+                  .execute());
+  }
+
+  public String generateSchemaPayloadFingerprint(SchemaPayload payload)
+  {
+    return fingerprintGenerator.generateFingerprint(payload);
+  }
+
+  /**
+   * Persist segment schema and update segments in a transaction.
+   */
+  public void persistSchemaAndUpdateSegmentsTable(String dataSource, 
List<SegmentSchemaMetadataPlus> segmentSchemas, String schemaVersion)
+  {
+    connector.retryTransaction((TransactionCallback<Void>) (handle, status) -> 
{
+      Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>();
+
+      for (SegmentSchemaMetadataPlus segmentSchema : segmentSchemas) {
+        schemaPayloadMap.put(
+            segmentSchema.getFingerprint(),
+            segmentSchema.getSegmentSchemaMetadata().getSchemaPayload()
+        );
+      }
+      persistSegmentSchema(handle, dataSource, schemaPayloadMap, 
schemaVersion);
+      updateSegmentWithSchemaInformation(handle, dataSource, schemaVersion, 
segmentSchemas);
+
+      return null;
+    }, 1, 3);
+  }
+
+  /**
+   * Persist unique segment schema in the DB.
+   */
+  public void persistSegmentSchema(
+      Handle handle,
+      String dataSource,
+      Map<String, SchemaPayload> fingerprintSchemaPayloadMap,
+      String schemaVersion
+  ) throws JsonProcessingException
+  {
+    try {
+      // Filter already existing schema
+      Map<Boolean, Set<String>> existingFingerprintsAndUsedStatus = 
fingerprintExistBatch(handle, dataSource, schemaVersion, 
fingerprintSchemaPayloadMap.keySet());
+      Set<String> usedExistingFingerprints = 
existingFingerprintsAndUsedStatus.containsKey(true) ? 
existingFingerprintsAndUsedStatus.get(true) : new HashSet<>();
+      Set<String> unusedExistingFingerprints = 
existingFingerprintsAndUsedStatus.containsKey(false) ? 
existingFingerprintsAndUsedStatus.get(false) : new HashSet<>();
+      Set<String> existingFingerprints = Sets.union(usedExistingFingerprints, 
unusedExistingFingerprints);
+      if (existingFingerprints.size() > 0) {
+        log.info(

Review Comment:
   Emitting a metric in the middle of a transaction is probably not a good 
idea. The result returned by the methods should be used after the transaction 
has finished to emit any required metrics.



##########
server/src/main/java/org/apache/druid/segment/metadata/KillUnreferencedSegmentSchemas.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import com.google.inject.Inject;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.SegmentsMetadataManager;
+
+import java.util.List;
+
+/**
+ * This class deals with cleaning schema which is not referenced by any used 
segment.
+ * <p>
+ * <ol>
+ * <li>If a schema is not referenced, UPDATE schemas SET used = false, 
used_status_last_updated = now</li>
+ * <li>DELETE FROM schemas WHERE used = false AND used_status_last_updated < 6 
hours ago</li>
+ * <li>When creating a new segment, try to find schema for the fingerprint of 
the segment.</li>
+ *    <ol type="a">
+ *    <li> If no record found, create a new one.</li>
+ *    <li> If record found which has used = true, reuse this schema_id.</li>
+ *    <li> If record found which has used = false, UPDATE SET used = true, 
used_status_last_updated = now</li>
+ *    </ol>
+ * </ol>
+ * </p>
+ * <p>
+ * Possible race conditions:
+ *    <ol type="a">
+ *    <li> Between ops 1 and 3b: In other words, we might end up with a 
segment that points to a schema that has just been marked as unused. This can 
be repaired by the coordinator duty. </li>
+ *    <li> Between 2 and 3c: This can be handled. Either 2 will fail to update 
any rows (good case) or 3c will fail to update any rows and thus return 0 (bad 
case). In the bad case, we need to recreate the schema, same as step 3a. </li>
+ *    </ol>
+ * </p>
+ */
+@LazySingleton
+public class KillUnreferencedSegmentSchemas

Review Comment:
   No, that goes against the clean duty design that we currently have. All the 
duties (except maybe `CompactSegments`) are instantiated by the coordinator 
itself rather than being injected. If we need more dependencies in the duty in 
the future, so be it. There is no harm in adding new arguments to the 
`DruidCoordinator` constructor. In fact, it might even be preferred as it gives 
a clear picture of the things that the coordinator needs.
   
   But as it turns out, in the current case, that might not even be needed. The 
only new dependency required by the new duty is `SegmentSchemaManager`. You can 
inject it into `MetadataManager` and access it the same way that other metadata 
manager tools inside it are being accessed. This way, you won't even have to 
add the argument to the `DruidCoordinator` constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to