This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8be787a51bb Allow use of centralized datasource schema and segment 
metadata cache together (#17996)
8be787a51bb is described below

commit 8be787a51bbd695dbe83127002fba32e2ed4025f
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon May 19 22:28:56 2025 +0530

    Allow use of centralized datasource schema and segment metadata cache 
together (#17996)
    
    Description
    -----------
    #17935 enables use of `HeapMemorySegmentMetadataCache` on the Coordinator.
    But it cannot be used in conjunction with centralized datasource schema 
(i.e. `SegmentSchemaCache`)
    This patch supports usage of both features on the Coordinator together.
    
    Main Changes
    --------------
    - Make `SegmentSchemaCache` a dependency of `HeapMemorySegmentMetadataCache`
    - Bind `SegmentMetadataCache` and `SegmentSchemaCache` in 
`MetadataManagerModule`
    - Add `NoopSegmentSchemaCache` to be used on the Overlord
    - Poll schemas in `HeapMemorySegmentMetadataCache` and update 
`SegmentSchemaCache`
    - Update the `used_status_last_updated` column of a segment record when its 
schema fingerprint is updated
    
    Fix a race condition
    -------------------
    Add a sync buffer duration of 10 seconds to `HeapMemorySegmentMetadataCache`
    
    - Handles a race condition between sync and insert to cache (caught in 
`CompactionTaskRunTest`)
    - Prevents removal of entries from cache if they have a last updated time 
just before sync start
    and were added to the cache just after sync start
    - This means that non-leader nodes will continue to consider a segment as 
used if it was marked unused
    within 10 seconds of any other update done it (created, marked used, schema 
info added)
    - 10s is more than enough for this, since the cache is already performing 
as expected in several prod clusters.
    
    Guice changes
    ---------------
    - Add `MetadataManagerModule` used only by Coordinator and Overlord to bind 
metadata managers
    - Restrict `SQLMetadataStorageDruidModule` to bind only SQL connector 
related stuff
---
 .../MaterializedViewSupervisorTest.java            |   2 -
 .../materializedview/DatasourceOptimizerTest.java  |   2 -
 .../indexing/common/actions/TaskActionTestKit.java |   7 +-
 .../indexing/common/task/IngestionTestBase.java    |   8 +-
 .../overlord/TaskLockBoxConcurrencyTest.java       |   3 -
 .../druid/indexing/overlord/TaskLockboxTest.java   |   3 -
 .../indexing/overlord/TaskQueueScaleTest.java      |   3 -
 .../SeekableStreamIndexTaskTestBase.java           |   2 -
 ...ose.cds-coordinator-metadata-query-disabled.yml |   4 +-
 ...er-compose.cds-task-schema-publish-disabled.yml |   4 +-
 ...ocker-compose.centralized-datasource-schema.yml |   4 +-
 .../apache/druid/guice/MetadataConfigModule.java   |  28 ++-
 .../druid/guice/SQLMetadataStorageDruidModule.java |  56 ------
 .../IndexerSQLMetadataStorageCoordinator.java      |  14 +-
 .../SqlSegmentsMetadataManagerProvider.java        |   4 +-
 .../druid/metadata/SqlSegmentsMetadataQuery.java   | 110 +++++++++++
 ...lSegmentMetadataReadOnlyTransactionFactory.java | 114 +++++++++++
 .../SqlSegmentMetadataTransactionFactory.java      |  62 +-----
 .../segment/SqlSegmentsMetadataManagerV2.java      |  15 +-
 .../cache/HeapMemoryDatasourceSegmentCache.java    |   8 +-
 .../cache/HeapMemorySegmentMetadataCache.java      | 218 +++++++++++++++++++--
 .../druid/metadata/segment/cache/Metric.java       |  10 +
 .../segment/cache/SegmentSchemaRecord.java         |  47 +++++
 .../segment/metadata/NoopSegmentSchemaCache.java   | 133 +++++++++++++
 .../druid/segment/metadata/SegmentSchemaCache.java |  33 +++-
 .../segment/metadata/SegmentSchemaManager.java     |  26 ++-
 ...rSQLMetadataStorageCoordinatorMarkUsedTest.java |   2 -
 ...rSQLMetadataStorageCoordinatorReadOnlyTest.java |  29 ++-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  |  15 +-
 ...ataStorageCoordinatorSchemaPersistenceTest.java |   6 +-
 ...dexerSqlMetadataStorageCoordinatorTestBase.java |  50 +++--
 .../SqlSegmentsMetadataManagerProviderTest.java    |   2 +-
 .../segment/SqlSegmentsMetadataManagerV2Test.java  |  63 +++---
 .../cache/HeapMemorySegmentMetadataCacheTest.java  | 165 +++++++++++++++-
 .../server/coordinator/CreateDataSegments.java     |  18 +-
 .../duty/KillUnreferencedSegmentSchemaTest.java    |   9 +-
 .../java/org/apache/druid/cli/CliCoordinator.java  |  20 +-
 .../java/org/apache/druid/cli/CliOverlord.java     |   9 +-
 .../main/java/org/apache/druid/cli/CliPeon.java    |  31 +--
 .../java/org/apache/druid/cli/ServerRunnable.java  |  14 +-
 .../apache/druid/guice/MetadataManagerModule.java  | 137 +++++++++++++
 .../druid/guice/SegmentSchemaCacheModule.java      |   2 -
 42 files changed, 1138 insertions(+), 354 deletions(-)

diff --git 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index 62b3f4af3d7..01964ce2826 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Sets;
 import junit.framework.AssertionFailedError;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
-import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.error.EntryAlreadyExists;
 import org.apache.druid.indexer.HadoopIOConfig;
 import org.apache.druid.indexer.HadoopIngestionSpec;
@@ -113,7 +112,6 @@ public class MaterializedViewSupervisorTest
             derbyConnectorRule.metadataTablesConfigSupplier().get(),
             derbyConnector,
             new TestDruidLeaderSelector(),
-            Set.of(NodeRole.OVERLORD),
             NoopSegmentMetadataCache.instance(),
             NoopServiceEmitter.instance()
         ),
diff --git 
a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
 
b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
index 755b0efddac..1ec98359db7 100644
--- 
a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
+++ 
b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java
@@ -37,7 +37,6 @@ import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
 import org.apache.druid.client.selector.RandomServerSelectorStrategy;
 import org.apache.druid.curator.CuratorTestBase;
-import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.indexing.materializedview.DerivativeDataSourceMetadata;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
@@ -123,7 +122,6 @@ public class DatasourceOptimizerTest extends CuratorTestBase
             derbyConnectorRule.metadataTablesConfigSupplier().get(),
             derbyConnector,
             new TestDruidLeaderSelector(),
-            Set.of(NodeRole.OVERLORD),
             NoopSegmentMetadataCache.instance(),
             NoopServiceEmitter.instance()
         ),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
index f5a28d5d67c..3a63495928a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.common.actions;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Suppliers;
-import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
@@ -40,6 +39,7 @@ import 
org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
 import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
 import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
 import org.apache.druid.segment.metadata.SegmentSchemaManager;
 import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
 import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
@@ -49,8 +49,6 @@ import org.easymock.EasyMock;
 import org.joda.time.Period;
 import org.junit.rules.ExternalResource;
 
-import java.util.Set;
-
 public class TaskActionTestKit extends ExternalResource
 {
   private final MetadataStorageTablesConfig metadataStorageTablesConfig = 
MetadataStorageTablesConfig.fromBase("druid");
@@ -172,10 +170,12 @@ public class TaskActionTestKit extends ExternalResource
         = useSegmentMetadataCache
           ? SegmentMetadataCache.UsageMode.ALWAYS
           : SegmentMetadataCache.UsageMode.NEVER;
+
     segmentMetadataCache = new HeapMemorySegmentMetadataCache(
         objectMapper,
         Suppliers.ofInstance(new 
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
         Suppliers.ofInstance(metadataStorageTablesConfig),
+        new NoopSegmentSchemaCache(),
         testDerbyConnector,
         (poolSize, name) -> new WrappingScheduledExecutorService(name, 
metadataCachePollExec, false),
         NoopServiceEmitter.instance()
@@ -189,7 +189,6 @@ public class TaskActionTestKit extends ExternalResource
         metadataStorageTablesConfig,
         testDerbyConnector,
         leaderSelector,
-        Set.of(NodeRole.OVERLORD),
         segmentMetadataCache,
         NoopServiceEmitter.instance()
     )
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 9135b097578..848a2bb91ce 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -34,7 +34,6 @@ import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.RegexInputFormat;
 import org.apache.druid.data.input.impl.RegexParseSpec;
-import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.report.IngestionStatsAndErrors;
 import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
@@ -136,6 +135,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
   private SupervisorManager supervisorManager;
   private TestDataSegmentKiller dataSegmentKiller;
   private SegmentMetadataCache segmentMetadataCache;
+  private SegmentSchemaCache segmentSchemaCache;
   protected File reportsFile;
 
   protected IngestionTestBase()
@@ -168,6 +168,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
         derbyConnectorRule.getConnector()
     );
 
+    segmentSchemaCache = new SegmentSchemaCache();
     storageCoordinator = new IndexerSQLMetadataStorageCoordinator(
         createTransactionFactory(),
         objectMapper,
@@ -176,14 +177,13 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
         segmentSchemaManager,
         CentralizedDatasourceSchemaConfig.create()
     );
-    SegmentSchemaCache segmentSchemaCache = new SegmentSchemaCache();
     segmentsMetadataManager = new SqlSegmentsMetadataManagerV2(
         segmentMetadataCache,
         segmentSchemaCache,
         derbyConnectorRule.getConnector(),
         () -> new SegmentsMetadataManagerConfig(null, null),
         derbyConnectorRule.metadataTablesConfigSupplier(),
-        CentralizedDatasourceSchemaConfig.create(),
+        CentralizedDatasourceSchemaConfig::create,
         NoopServiceEmitter.instance(),
         objectMapper
     );
@@ -325,6 +325,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
         objectMapper,
         Suppliers.ofInstance(new 
SegmentsMetadataManagerConfig(Period.millis(10), cacheMode)),
         derbyConnectorRule.metadataTablesConfigSupplier(),
+        segmentSchemaCache,
         derbyConnectorRule.getConnector(),
         ScheduledExecutors::fixed,
         NoopServiceEmitter.instance()
@@ -338,7 +339,6 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
         derbyConnectorRule.metadataTablesConfigSupplier().get(),
         derbyConnectorRule.getConnector(),
         leaderSelector,
-        Set.of(NodeRole.OVERLORD),
         segmentMetadataCache,
         NoopServiceEmitter.instance()
     );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
index a6eb19f4783..a55117aad12 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.overlord;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.common.guava.SettableSupplier;
-import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
@@ -50,7 +49,6 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -91,7 +89,6 @@ public class TaskLockBoxConcurrencyTest
                 derby.metadataTablesConfigSupplier().get(),
                 derbyConnector,
                 new TestDruidLeaderSelector(),
-                Set.of(NodeRole.OVERLORD),
                 NoopSegmentMetadataCache.instance(),
                 NoopServiceEmitter.instance()
             ),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 8c2c78611ac..48c5b5c4e94 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -29,7 +29,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.SegmentLock;
@@ -138,7 +137,6 @@ public class TaskLockboxTest
             tablesConfig,
             derbyConnector,
             new TestDruidLeaderSelector(),
-            Set.of(NodeRole.OVERLORD),
             NoopSegmentMetadataCache.instance(),
             NoopServiceEmitter.instance()
         ),
@@ -481,7 +479,6 @@ public class TaskLockboxTest
             derby.metadataTablesConfigSupplier().get(),
             derbyConnector,
             new TestDruidLeaderSelector(),
-            Set.of(NodeRole.OVERLORD),
             NoopSegmentMetadataCache.instance(),
             NoopServiceEmitter.instance()
         ),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
index d73e51e013a..72e0e5ad855 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
@@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
@@ -70,7 +69,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -113,7 +111,6 @@ public class TaskQueueScaleTest
             derbyConnectorRule.metadataTablesConfigSupplier().get(),
             derbyConnectorRule.getConnector(),
             new TestDruidLeaderSelector(),
-            Set.of(NodeRole.OVERLORD),
             NoopSegmentMetadataCache.instance(),
             NoopServiceEmitter.instance()
         ),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 485b6699d1b..e0332708c3f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -47,7 +47,6 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.discovery.DataNodeService;
 import org.apache.druid.discovery.DruidNodeAnnouncer;
 import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.granularity.UniformGranularitySpec;
@@ -599,7 +598,6 @@ public abstract class SeekableStreamIndexTaskTestBase 
extends EasyMockSupport
             derby.metadataTablesConfigSupplier().get(),
             derbyConnector,
             new TestDruidLeaderSelector(),
-            Set.of(NodeRole.OVERLORD),
             NoopSegmentMetadataCache.instance(),
             NoopServiceEmitter.instance()
         ),
diff --git 
a/integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml
 
b/integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml
index 60cff61472d..b435c290ef8 100644
--- 
a/integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml
+++ 
b/integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml
@@ -40,7 +40,7 @@ services:
       - druid_centralizedDatasourceSchema_backFillPeriod=15000
       - druid_coordinator_segmentMetadata_metadataRefreshPeriod=PT15S
       - druid_coordinator_segmentMetadata_disableSegmentMetadataQueries=true
-      - druid_manager_segments_useIncrementalCache=never
+      - druid_manager_segments_useIncrementalCache=always
     depends_on:
       - druid-overlord
       - druid-metadata-storage
@@ -53,7 +53,7 @@ services:
     environment:
       - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
       - druid_centralizedDatasourceSchema_enabled=true
-      - druid_manager_segments_useIncrementalCache=never
+      - druid_manager_segments_useIncrementalCache=always
     depends_on:
       - druid-metadata-storage
       - druid-zookeeper-kafka
diff --git 
a/integration-tests/docker/docker-compose.cds-task-schema-publish-disabled.yml 
b/integration-tests/docker/docker-compose.cds-task-schema-publish-disabled.yml
index f7923f368d8..b423ad9f658 100644
--- 
a/integration-tests/docker/docker-compose.cds-task-schema-publish-disabled.yml
+++ 
b/integration-tests/docker/docker-compose.cds-task-schema-publish-disabled.yml
@@ -40,7 +40,7 @@ services:
       - druid_centralizedDatasourceSchema_backFillEnabled=true
       - druid_centralizedDatasourceSchema_backFillPeriod=15000
       - druid_coordinator_segmentMetadata_metadataRefreshPeriod=PT15S
-      - druid_manager_segments_useIncrementalCache=never
+      - druid_manager_segments_useIncrementalCache=always
     depends_on:
       - druid-overlord
       - druid-metadata-storage
@@ -54,7 +54,7 @@ services:
       - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
       - druid_centralizedDatasourceSchema_enabled=true
       - druid_centralizedDatasourceSchema_taskSchemaPublishDisabled=true
-      - druid_manager_segments_useIncrementalCache=never
+      - druid_manager_segments_useIncrementalCache=always
     depends_on:
       - druid-metadata-storage
       - druid-zookeeper-kafka
diff --git 
a/integration-tests/docker/docker-compose.centralized-datasource-schema.yml 
b/integration-tests/docker/docker-compose.centralized-datasource-schema.yml
index 8674a562012..b47ce1533af 100644
--- a/integration-tests/docker/docker-compose.centralized-datasource-schema.yml
+++ b/integration-tests/docker/docker-compose.centralized-datasource-schema.yml
@@ -39,7 +39,7 @@ services:
       - druid_centralizedDatasourceSchema_backFillEnabled=true
       - druid_centralizedDatasourceSchema_backFillPeriod=15000
       - druid_coordinator_segmentMetadata_metadataRefreshPeriod=PT15S
-      - druid_manager_segments_useIncrementalCache=never
+      - druid_manager_segments_useIncrementalCache=always
     depends_on:
       - druid-overlord
       - druid-metadata-storage
@@ -52,7 +52,7 @@ services:
     environment:
       - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
       - druid_centralizedDatasourceSchema_enabled=true
-      - druid_manager_segments_useIncrementalCache=never
+      - druid_manager_segments_useIncrementalCache=always
     depends_on:
       - druid-metadata-storage
       - druid-zookeeper-kafka
diff --git 
a/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java 
b/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java
index 1ca4578e560..7a429049db8 100644
--- a/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java
+++ b/server/src/main/java/org/apache/druid/guice/MetadataConfigModule.java
@@ -21,29 +21,43 @@ package org.apache.druid.guice;
 
 import com.google.inject.Binder;
 import com.google.inject.Module;
-import org.apache.druid.metadata.MetadataRuleManagerConfig;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.metadata.MetadataStorageTablesConfig;
-import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 
+import java.util.Properties;
+
+/**
+ * Binds the following metadata configs for all services:
+ * <ul>
+ * <li>{@link MetadataStorageTablesConfig}</li>
+ * <li>{@link MetadataStorageConnectorConfig}</li>
+ * <li>{@link CentralizedDatasourceSchemaConfig}</li>
+ * </ul>
+ * Ideally, the storage configs should be bound only on Coordinator and 
Overlord,
+ * but they are needed for other services too since metadata storage extensions
+ * are currently loaded on all services.
+ */
 public class MetadataConfigModule implements Module
 {
+  public static final String CENTRALIZED_DATASOURCE_SCHEMA_ENABLED =
+      CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX + ".enabled";
+
   @Override
   public void configure(Binder binder)
   {
     JsonConfigProvider.bind(binder, MetadataStorageTablesConfig.PROPERTY_BASE, 
MetadataStorageTablesConfig.class);
     JsonConfigProvider.bind(binder, 
MetadataStorageConnectorConfig.PROPERTY_BASE, 
MetadataStorageConnectorConfig.class);
 
-    JsonConfigProvider.bind(binder, "druid.manager.segments", 
SegmentsMetadataManagerConfig.class);
-    JsonConfigProvider.bind(binder, "druid.manager.rules", 
MetadataRuleManagerConfig.class);
-
-    // SegmentSchemaCacheConfig needs to be bound on all services since
-    // it is a dependency of SqlSegmentsMetadataManager (both legacy and V2)
     JsonConfigProvider.bind(
         binder,
         CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX,
         CentralizedDatasourceSchemaConfig.class
     );
   }
+
+  public static boolean isSegmentSchemaCacheEnabled(Properties properties)
+  {
+    return 
Boolean.parseBoolean(properties.getProperty(CENTRALIZED_DATASOURCE_SCHEMA_ENABLED));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java
 
b/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java
index 5f8197d31c3..8ef0f36a07f 100644
--- 
a/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java
+++ 
b/server/src/main/java/org/apache/druid/guice/SQLMetadataStorageDruidModule.java
@@ -23,25 +23,10 @@ import com.google.inject.Binder;
 import com.google.inject.Key;
 import com.google.inject.Module;
 import org.apache.druid.audit.AuditManager;
-import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
-import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
-import org.apache.druid.metadata.MetadataRuleManager;
-import org.apache.druid.metadata.MetadataRuleManagerProvider;
 import org.apache.druid.metadata.MetadataStorageActionHandlerFactory;
 import org.apache.druid.metadata.MetadataStorageConnector;
 import org.apache.druid.metadata.MetadataStorageProvider;
-import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.apache.druid.metadata.SQLMetadataConnector;
-import org.apache.druid.metadata.SQLMetadataRuleManager;
-import org.apache.druid.metadata.SQLMetadataRuleManagerProvider;
-import org.apache.druid.metadata.SQLMetadataSupervisorManager;
-import org.apache.druid.metadata.SegmentsMetadataManager;
-import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
-import org.apache.druid.metadata.SqlSegmentsMetadataManagerProvider;
-import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
-import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
-import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
-import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
 import org.apache.druid.server.audit.AuditManagerConfig;
 import org.apache.druid.server.audit.AuditSerdeHelper;
 import org.apache.druid.server.audit.SQLAuditManager;
@@ -69,14 +54,7 @@ public class SQLMetadataStorageDruidModule implements Module
     PolyBind.createChoiceWithDefault(binder, prop, 
Key.get(MetadataStorageProvider.class), defaultValue);
     PolyBind.createChoiceWithDefault(binder, prop, 
Key.get(SQLMetadataConnector.class), defaultValue);
 
-    PolyBind.createChoiceWithDefault(binder, prop, 
Key.get(SegmentsMetadataManager.class), defaultValue);
-    PolyBind.createChoiceWithDefault(binder, prop, 
Key.get(SegmentsMetadataManagerProvider.class), defaultValue);
-    PolyBind.createChoiceWithDefault(binder, prop, 
Key.get(MetadataRuleManager.class), defaultValue);
-    PolyBind.createChoiceWithDefault(binder, prop, 
Key.get(MetadataRuleManagerProvider.class), defaultValue);
-    PolyBind.createChoiceWithDefault(binder, prop, 
Key.get(SegmentMetadataTransactionFactory.class), defaultValue);
-    PolyBind.createChoiceWithDefault(binder, prop, 
Key.get(IndexerMetadataStorageCoordinator.class), defaultValue);
     PolyBind.createChoiceWithDefault(binder, prop, 
Key.get(MetadataStorageActionHandlerFactory.class), defaultValue);
-    PolyBind.createChoiceWithDefault(binder, prop, 
Key.get(MetadataSupervisorManager.class), defaultValue);
 
     configureAuditManager(binder);
   }
@@ -84,41 +62,7 @@ public class SQLMetadataStorageDruidModule implements Module
   @Override
   public void configure(Binder binder)
   {
-    PolyBind.optionBinder(binder, 
Key.get(SegmentsMetadataManagerProvider.class))
-            .addBinding(type)
-            .to(SqlSegmentsMetadataManagerProvider.class)
-            .in(LazySingleton.class);
 
-    PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
-            .addBinding(type)
-            .to(SQLMetadataRuleManager.class)
-            .in(LazySingleton.class);
-
-    PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
-            .addBinding(type)
-            .to(SQLMetadataRuleManagerProvider.class)
-            .in(LazySingleton.class);
-
-    // SegmentMetadataCache is bound for all services but is used only by the 
Overlord and Coordinator
-    // similar to some other classes bound here, such as 
IndexerSQLMetadataStorageCoordinator
-    binder.bind(SegmentMetadataCache.class)
-          .to(HeapMemorySegmentMetadataCache.class)
-          .in(LazySingleton.class);
-
-    PolyBind.optionBinder(binder, 
Key.get(SegmentMetadataTransactionFactory.class))
-            .addBinding(type)
-            .to(SqlSegmentMetadataTransactionFactory.class)
-            .in(LazySingleton.class);
-
-    PolyBind.optionBinder(binder, 
Key.get(IndexerMetadataStorageCoordinator.class))
-            .addBinding(type)
-            .to(IndexerSQLMetadataStorageCoordinator.class)
-            .in(ManageLifecycle.class);
-
-    PolyBind.optionBinder(binder, Key.get(MetadataSupervisorManager.class))
-            .addBinding(type)
-            .to(SQLMetadataSupervisorManager.class)
-            .in(LazySingleton.class);
   }
 
   private void configureAuditManager(Binder binder)
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 6ef71cb83de..8ede92dc9c5 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -1631,7 +1631,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   private void persistSchema(
       final SegmentMetadataTransaction transaction,
       final Set<DataSegment> segments,
-      final SegmentSchemaMapping segmentSchemaMapping
+      final SegmentSchemaMapping segmentSchemaMapping,
+      final DateTime updateTime
   ) throws JsonProcessingException
   {
     if (segmentSchemaMapping.getSchemaVersion() != 
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) {
@@ -1650,7 +1651,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         transaction.getHandle(),
         dataSource,
         segmentSchemaMapping.getSchemaVersion(),
-        segmentSchemaMapping.getSchemaFingerprintToPayloadMap()
+        segmentSchemaMapping.getSchemaFingerprintToPayloadMap(),
+        updateTime
     );
   }
 
@@ -1662,10 +1664,11 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   {
     final Set<DataSegment> toInsertSegments = new HashSet<>();
     try {
+      final DateTime createdTime = DateTimes.nowUtc();
       boolean shouldPersistSchema = shouldPersistSchema(segmentSchemaMapping);
 
       if (shouldPersistSchema) {
-        persistSchema(transaction, segments, segmentSchemaMapping);
+        persistSchema(transaction, segments, segmentSchemaMapping, 
createdTime);
       }
 
       final Set<SegmentId> segmentIds = 
segments.stream().map(DataSegment::getId).collect(Collectors.toSet());
@@ -1678,7 +1681,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         }
       }
 
-      final DateTime createdTime = DateTimes.nowUtc();
       final Set<DataSegment> usedSegments = 
findNonOvershadowedSegments(segments);
 
       final Set<DataSegmentPlus> segmentPlusToInsert = 
toInsertSegments.stream().map(segment -> {
@@ -1881,8 +1883,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       Map<String, String> upgradedFromSegmentIdMap
   ) throws Exception
   {
+    final DateTime createdTime = DateTimes.nowUtc();
     if (shouldPersistSchema(segmentSchemaMapping)) {
-      persistSchema(transaction, segments, segmentSchemaMapping);
+      persistSchema(transaction, segments, segmentSchemaMapping, createdTime);
     }
 
     // Do not insert segment IDs which already exist
@@ -1892,7 +1895,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         s -> !existingSegmentIds.contains(s.getId().toString())
     ).collect(Collectors.toSet());
 
-    final DateTime createdTime = DateTimes.nowUtc();
     final Set<DataSegmentPlus> segmentPlusToInsert = 
segmentsToInsert.stream().map(segment -> {
       SegmentMetadata segmentMetadata = 
getSegmentMetadataFromSchemaMappingOrUpgradeMetadata(
           segment.getId(),
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProvider.java
 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProvider.java
index f69dc950808..8491c524226 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProvider.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProvider.java
@@ -39,7 +39,7 @@ public class SqlSegmentsMetadataManagerProvider implements 
SegmentsMetadataManag
   private final ServiceEmitter serviceEmitter;
   private final SegmentSchemaCache segmentSchemaCache;
   private final SegmentMetadataCache segmentMetadataCache;
-  private final CentralizedDatasourceSchemaConfig 
centralizedDatasourceSchemaConfig;
+  private final Supplier<CentralizedDatasourceSchemaConfig> 
centralizedDatasourceSchemaConfig;
 
   @Inject
   public SqlSegmentsMetadataManagerProvider(
@@ -50,7 +50,7 @@ public class SqlSegmentsMetadataManagerProvider implements 
SegmentsMetadataManag
       SQLMetadataConnector connector,
       Lifecycle lifecycle,
       SegmentSchemaCache segmentSchemaCache,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+      Supplier<CentralizedDatasourceSchemaConfig> 
centralizedDatasourceSchemaConfig,
       ServiceEmitter serviceEmitter
   )
   {
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index b1d7d16a145..de2e35813c4 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -39,6 +39,9 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.segment.cache.SegmentSchemaRecord;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.server.http.DataSegmentPlus;
 import org.apache.druid.timeline.DataSegment;
@@ -607,6 +610,91 @@ public class SqlSegmentsMetadataQuery
     return CloseableIterators.wrap(resultIterator, resultIterator);
   }
 
+  /**
+   * Retrieves all used schema fingerprints present in the metadata store.
+   */
+  public Set<String> retrieveAllUsedSegmentSchemaFingerprints()
+  {
+    final String sql = StringUtils.format(
+        "SELECT fingerprint FROM %s WHERE version = %s AND used = true",
+        dbTables.getSegmentSchemasTable(), 
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+    );
+    return Set.copyOf(
+        handle.createQuery(sql)
+              .setFetchSize(connector.getStreamingFetchSize())
+              .mapTo(String.class)
+              .list()
+    );
+  }
+
+  /**
+   * Retrieves all used segment schemas present in the metadata store 
irrespective
+   * of their last updated time.
+   */
+  public List<SegmentSchemaRecord> retrieveAllUsedSegmentSchemas()
+  {
+    final String sql = StringUtils.format(
+        "SELECT fingerprint, payload FROM %s"
+        + " WHERE version = %s AND used = true",
+        dbTables.getSegmentSchemasTable(), 
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+    );
+    return retrieveValidSchemaRecordsWithQuery(handle.createQuery(sql));
+  }
+
+  /**
+   * Retrieves segment schemas from the metadata store for the given 
fingerprints.
+   */
+  public List<SegmentSchemaRecord> retrieveUsedSegmentSchemasForFingerprints(
+      Set<String> schemaFingerprints
+  )
+  {
+    final List<List<String>> fingerprintBatches = Lists.partition(
+        List.copyOf(schemaFingerprints),
+        MAX_INTERVALS_PER_BATCH
+    );
+
+    final List<SegmentSchemaRecord> records = new ArrayList<>();
+    for (List<String> fingerprintBatch : fingerprintBatches) {
+      records.addAll(
+          retrieveBatchOfSegmentSchemas(fingerprintBatch)
+      );
+    }
+
+    return records;
+  }
+
+  /**
+   * Retrieves a batch of segment schema records for the given fingerprints.
+   */
+  private List<SegmentSchemaRecord> retrieveBatchOfSegmentSchemas(List<String> 
schemaFingerprints)
+  {
+    final String sql = StringUtils.format(
+        "SELECT fingerprint, payload FROM %s"
+        + " WHERE version = %s AND used = true"
+        + " %s",
+        dbTables.getSegmentSchemasTable(),
+        CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
+        getParameterizedInConditionForColumn("fingerprint", schemaFingerprints)
+    );
+
+    final Query<Map<String, Object>> query = handle.createQuery(sql);
+    bindColumnValuesToQueryWithInCondition("fingerprint", schemaFingerprints, 
query);
+
+    return retrieveValidSchemaRecordsWithQuery(query);
+  }
+
+  private List<SegmentSchemaRecord> retrieveValidSchemaRecordsWithQuery(
+      Query<Map<String, Object>> query
+  )
+  {
+    return query.setFetchSize(connector.getStreamingFetchSize())
+                .map((index, r, ctx) -> mapToSchemaRecord(r))
+                .list()
+                .stream()
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+  }
+
   /**
    * Marks the given segment IDs as used.
    *
@@ -1467,6 +1555,28 @@ public class SqlSegmentsMetadataQuery
     }
   }
 
+  /**
+   * Tries to parse the fields of the result set into a {@link 
SegmentSchemaRecord}.
+   *
+   * @return null if an error occurred while parsing the result
+   */
+  @Nullable
+  private SegmentSchemaRecord mapToSchemaRecord(ResultSet resultSet)
+  {
+    String fingerprint = null;
+    try {
+      fingerprint = resultSet.getString("fingerprint");
+      return new SegmentSchemaRecord(
+          fingerprint,
+          jsonMapper.readValue(resultSet.getBytes("payload"), 
SchemaPayload.class)
+      );
+    }
+    catch (Throwable t) {
+      log.error(t, "Could not read segment schema with fingerprint[%s]", 
fingerprint);
+      return null;
+    }
+  }
+
   private ResultIterator<DataSegment> 
getDataSegmentResultIterator(Query<Map<String, Object>> sql)
   {
     return sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, 
r.getBytes(2), DataSegment.class))
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataReadOnlyTransactionFactory.java
 
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataReadOnlyTransactionFactory.java
new file mode 100644
index 00000000000..0d0d1e42a1e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataReadOnlyTransactionFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata.segment;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SQLMetadataConnector;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.TransactionStatus;
+
+/**
+ * Factory for read-only {@link SegmentMetadataTransaction}s that always read
+ * directly from the metadata store and never from the {@code 
SegmentMetadataCache}.
+ *
+ * @see SqlSegmentMetadataTransactionFactory
+ */
+public class SqlSegmentMetadataReadOnlyTransactionFactory implements 
SegmentMetadataTransactionFactory
+{
+  private static final int QUIET_RETRIES = 2;
+  private static final int MAX_RETRIES = 3;
+
+  private final ObjectMapper jsonMapper;
+  private final MetadataStorageTablesConfig tablesConfig;
+  private final SQLMetadataConnector connector;
+
+  @Inject
+  public SqlSegmentMetadataReadOnlyTransactionFactory(
+      ObjectMapper jsonMapper,
+      MetadataStorageTablesConfig tablesConfig,
+      SQLMetadataConnector connector
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.tablesConfig = tablesConfig;
+    this.connector = connector;
+  }
+
+  public int getMaxRetries()
+  {
+    return MAX_RETRIES;
+  }
+
+  public int getQuietRetries()
+  {
+    return QUIET_RETRIES;
+  }
+
+  @Override
+  public <T> T inReadOnlyDatasourceTransaction(
+      String dataSource,
+      SegmentMetadataReadTransaction.Callback<T> callback
+  )
+  {
+    return connector.retryReadOnlyTransaction(
+        (handle, status) -> {
+          final SegmentMetadataTransaction sqlTransaction
+              = createSqlTransaction(dataSource, handle, status);
+          return executeReadAndClose(sqlTransaction, callback);
+        },
+        QUIET_RETRIES,
+        getMaxRetries()
+    );
+  }
+
+  @Override
+  public <T> T inReadWriteDatasourceTransaction(
+      String dataSource,
+      SegmentMetadataTransaction.Callback<T> callback
+  )
+  {
+    throw DruidException.defensive("Only Overlord can perform write 
transactions on segment metadata.");
+  }
+
+  protected SegmentMetadataTransaction createSqlTransaction(
+      String dataSource,
+      Handle handle,
+      TransactionStatus transactionStatus
+  )
+  {
+    return new SqlSegmentMetadataTransaction(
+        dataSource,
+        handle, transactionStatus, connector, tablesConfig, jsonMapper
+    );
+  }
+
+  protected <T> T executeReadAndClose(
+      SegmentMetadataReadTransaction transaction,
+      SegmentMetadataReadTransaction.Callback<T> callback
+  ) throws Exception
+  {
+    try (transaction) {
+      return callback.inTransaction(transaction);
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java
 
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java
index 2942ed1a086..26083e44b67 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java
@@ -23,9 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
 import org.apache.druid.client.indexing.IndexingService;
 import org.apache.druid.discovery.DruidLeaderSelector;
-import org.apache.druid.discovery.NodeRole;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
@@ -34,10 +31,6 @@ import org.apache.druid.metadata.SQLMetadataConnector;
 import org.apache.druid.metadata.segment.cache.Metric;
 import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
 import org.apache.druid.query.DruidMetrics;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.TransactionStatus;
-
-import java.util.Set;
 
 /**
  * Factory for {@link SegmentMetadataTransaction}s. If the
@@ -56,46 +49,30 @@ import java.util.Set;
  * now, it continues to read directly from the metadata store for consistency
  * with older Druid versions.
  */
-public class SqlSegmentMetadataTransactionFactory implements 
SegmentMetadataTransactionFactory
+public class SqlSegmentMetadataTransactionFactory extends 
SqlSegmentMetadataReadOnlyTransactionFactory
 {
   private static final Logger log = new 
Logger(SqlSegmentMetadataTransactionFactory.class);
 
-  private static final int QUIET_RETRIES = 2;
-  private static final int MAX_RETRIES = 3;
-
-  private final ObjectMapper jsonMapper;
-  private final MetadataStorageTablesConfig tablesConfig;
   private final SQLMetadataConnector connector;
   private final DruidLeaderSelector leaderSelector;
   private final SegmentMetadataCache segmentMetadataCache;
   private final ServiceEmitter emitter;
 
-  private final boolean isNotOverlord;
-
   @Inject
   public SqlSegmentMetadataTransactionFactory(
       ObjectMapper jsonMapper,
       MetadataStorageTablesConfig tablesConfig,
       SQLMetadataConnector connector,
       @IndexingService DruidLeaderSelector leaderSelector,
-      @Self Set<NodeRole> nodeRoles,
       SegmentMetadataCache segmentMetadataCache,
       ServiceEmitter emitter
   )
   {
-    this.jsonMapper = jsonMapper;
-    this.tablesConfig = tablesConfig;
+    super(jsonMapper, tablesConfig, connector);
     this.connector = connector;
     this.leaderSelector = leaderSelector;
     this.segmentMetadataCache = segmentMetadataCache;
     this.emitter = emitter;
-
-    this.isNotOverlord = !nodeRoles.contains(NodeRole.OVERLORD);
-  }
-
-  public int getMaxRetries()
-  {
-    return MAX_RETRIES;
   }
 
   @Override
@@ -109,10 +86,7 @@ public class SqlSegmentMetadataTransactionFactory 
implements SegmentMetadataTran
           final SegmentMetadataTransaction sqlTransaction
               = createSqlTransaction(dataSource, handle, status);
 
-          if (isNotOverlord) {
-            // Read directly from the metadata store if not Overlord
-            return executeReadAndClose(sqlTransaction, callback);
-          } else if (segmentMetadataCache.isSyncedForRead()) {
+          if (segmentMetadataCache.isSyncedForRead()) {
             // Use cache as it is already synced with the metadata store
             emitTransactionCount(Metric.READ_ONLY_TRANSACTIONS, dataSource);
             return segmentMetadataCache.readCacheForDataSource(dataSource, 
dataSourceCache -> {
@@ -124,7 +98,7 @@ public class SqlSegmentMetadataTransactionFactory implements 
SegmentMetadataTran
             return executeReadAndClose(sqlTransaction, callback);
           }
         },
-        QUIET_RETRIES,
+        getQuietRetries(),
         getMaxRetries()
     );
   }
@@ -135,10 +109,6 @@ public class SqlSegmentMetadataTransactionFactory 
implements SegmentMetadataTran
       SegmentMetadataTransaction.Callback<T> callback
   )
   {
-    if (isNotOverlord) {
-      throw DruidException.defensive("Only Overlord can perform write 
transactions on segment metadata.");
-    }
-
     return connector.retryTransaction(
         (handle, status) -> {
           final SegmentMetadataTransaction sqlTransaction
@@ -167,23 +137,11 @@ public class SqlSegmentMetadataTransactionFactory 
implements SegmentMetadataTran
             return executeWriteAndClose(sqlTransaction, callback);
           }
         },
-        QUIET_RETRIES,
+        getQuietRetries(),
         getMaxRetries()
     );
   }
 
-  private SegmentMetadataTransaction createSqlTransaction(
-      String dataSource,
-      Handle handle,
-      TransactionStatus transactionStatus
-  )
-  {
-    return new SqlSegmentMetadataTransaction(
-        dataSource,
-        handle, transactionStatus, connector, tablesConfig, jsonMapper
-    );
-  }
-
   private <T> T executeWriteAndClose(
       SegmentMetadataTransaction transaction,
       SegmentMetadataTransaction.Callback<T> callback
@@ -201,16 +159,6 @@ public class SqlSegmentMetadataTransactionFactory 
implements SegmentMetadataTran
     }
   }
 
-  private <T> T executeReadAndClose(
-      SegmentMetadataReadTransaction transaction,
-      SegmentMetadataReadTransaction.Callback<T> callback
-  ) throws Exception
-  {
-    try (transaction) {
-      return callback.inTransaction(transaction);
-    }
-  }
-
   private void emitTransactionCount(String metricName, String datasource)
   {
     emitter.emit(
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2.java
 
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2.java
index 893950c8362..9d044962dd7 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2.java
@@ -63,7 +63,6 @@ public class SqlSegmentsMetadataManagerV2 implements 
SegmentsMetadataManager
   private final SegmentsMetadataManager delegate;
   private final SegmentMetadataCache segmentMetadataCache;
   private final SegmentsMetadataManagerConfig managerConfig;
-  private final CentralizedDatasourceSchemaConfig schemaConfig;
 
   public SqlSegmentsMetadataManagerV2(
       SegmentMetadataCache segmentMetadataCache,
@@ -71,7 +70,7 @@ public class SqlSegmentsMetadataManagerV2 implements 
SegmentsMetadataManager
       SQLMetadataConnector connector,
       Supplier<SegmentsMetadataManagerConfig> managerConfig,
       Supplier<MetadataStorageTablesConfig> tablesConfig,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+      Supplier<CentralizedDatasourceSchemaConfig> 
centralizedDatasourceSchemaConfig,
       ServiceEmitter serviceEmitter,
       ObjectMapper jsonMapper
   )
@@ -79,20 +78,10 @@ public class SqlSegmentsMetadataManagerV2 implements 
SegmentsMetadataManager
     this.delegate = new SqlSegmentsMetadataManager(
         jsonMapper,
         managerConfig, tablesConfig, connector, segmentSchemaCache,
-        centralizedDatasourceSchemaConfig, serviceEmitter
+        centralizedDatasourceSchemaConfig.get(), serviceEmitter
     );
     this.managerConfig = managerConfig.get();
     this.segmentMetadataCache = segmentMetadataCache;
-    this.schemaConfig = centralizedDatasourceSchemaConfig;
-
-    // Segment metadata cache currently cannot handle schema updates
-    if (segmentMetadataCache.isEnabled() && schemaConfig.isEnabled()) {
-      throw new IllegalArgumentException(
-          "Segment metadata incremental 
cache['druid.manager.segments.useIncrementalCache']"
-          + " and segment schema 
cache['druid.centralizedDatasourceSchema.enabled']"
-          + " must not be enabled together."
-      );
-    }
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java
 
b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java
index 8b9258a32a5..3a091a6c0b4 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java
@@ -224,7 +224,7 @@ class HeapMemoryDatasourceSegmentCache extends 
ReadWriteCache implements AutoClo
    *
    * @param persistedSegmentIds Segment IDs present in the metadata store
    * @param syncStartTime       Start time of the current sync
-   * @return Number of unpersisted segments removed from cache.
+   * @return Number of unpersisted segment IDs removed from the cache.
    */
   private int removeUnpersistedSegments(Set<SegmentId> persistedSegmentIds, 
DateTime syncStartTime)
   {
@@ -725,12 +725,6 @@ class HeapMemoryDatasourceSegmentCache extends 
ReadWriteCache implements AutoClo
              && unusedSegmentIdToUpdatedTime.isEmpty();
     }
 
-    private boolean isSegmentIdCached(SegmentId id)
-    {
-      return idToUsedSegment.containsKey(id)
-             || unusedSegmentIdToUpdatedTime.containsKey(id);
-    }
-
     /**
      * Removes the given segment from the cache.
      *
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java
 
b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java
index e954a4f8b4a..2a1cf50133e 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java
@@ -22,6 +22,7 @@ package org.apache.druid.metadata.segment.cache;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -48,6 +49,9 @@ import org.apache.druid.metadata.SQLMetadataConnector;
 import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
 import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
 import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SegmentMetadata;
+import org.apache.druid.segment.metadata.SegmentSchemaCache;
 import org.apache.druid.server.http.DataSegmentPlus;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
@@ -56,6 +60,7 @@ import org.joda.time.Duration;
 import org.skife.jdbi.v2.ResultIterator;
 import org.skife.jdbi.v2.TransactionCallback;
 
+import javax.annotation.Nullable;
 import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -70,6 +75,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * In-memory implementation of {@link SegmentMetadataCache}.
@@ -100,6 +107,22 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
   private static final int MIN_SYNC_DELAY_MILLIS = 1000;
   private static final int MAX_IMMEDIATE_SYNC_RETRIES = 3;
 
+  /**
+   * Buffer duration for which entries are kept in the cache even if the
+   * metadata store does not have them. In other words, a segment entry is
+   * removed from cache if the entry is not present in metadata store and has a
+   * {@code lastUpdatedTime < now - bufferWindow}.
+   * <p>
+   * This is primarily needed to handle a race condition between insert and 
sync
+   * where an entry with an updated time just before the sync start is added to
+   * the cache just after the sync has started.
+   * <p>
+   * This means that non-leader Overlord and all Coordinators will continue to
+   * consider a segment as used if it was marked as unused within the buffer 
period
+   * of a previous update (e.g. segment created, marked used or schema info 
updated).
+   */
+  private static final Duration SYNC_BUFFER_DURATION = 
Duration.standardSeconds(10);
+
   private enum CacheState
   {
     STOPPED, FOLLOWER, LEADER_FIRST_SYNC_PENDING, LEADER_FIRST_SYNC_STARTED, 
LEADER_READY
@@ -111,6 +134,9 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
   private final MetadataStorageTablesConfig tablesConfig;
   private final SQLMetadataConnector connector;
 
+  private final boolean useSchemaCache;
+  private final SegmentSchemaCache segmentSchemaCache;
+
   private final ListeningScheduledExecutorService pollExecutor;
   private final ServiceEmitter emitter;
 
@@ -141,6 +167,7 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
       ObjectMapper jsonMapper,
       Supplier<SegmentsMetadataManagerConfig> config,
       Supplier<MetadataStorageTablesConfig> tablesConfig,
+      SegmentSchemaCache segmentSchemaCache,
       SQLMetadataConnector connector,
       ScheduledExecutorFactory executorFactory,
       ServiceEmitter emitter
@@ -150,6 +177,8 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
     this.cacheMode = config.get().getCacheUsageMode();
     this.pollDuration = config.get().getPollDuration().toStandardDuration();
     this.tablesConfig = tablesConfig.get();
+    this.useSchemaCache = segmentSchemaCache.isEnabled();
+    this.segmentSchemaCache = segmentSchemaCache;
     this.connector = connector;
     this.pollExecutor = isEnabled()
                         ? 
MoreExecutors.listeningDecorator(executorFactory.create(1, 
"SegmentMetadataCache-%s"))
@@ -499,15 +528,18 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
    * <p>
    * The following actions are performed in every sync:
    * <ul>
-   * <li>Retrieve all used and unused segment IDs along with their updated 
timestamps</li>
+   * <li>Retrieve all used segment IDs along with their updated 
timestamps.</li>
+   * <li>Sync segment IDs in the cache with the retrieved segment IDs.</li>
    * <li>Retrieve payloads of used segments which have been updated in the 
metadata
-   * store but not in the cache</li>
-   * <li>Retrieve all pending segments and update the cache as needed</li>
-   * <li>Remove segments not present in the metadata store</li>
-   * <li>Reset the max unused partition IDs</li>
-   * <li>Change the cache state to ready if it is leader and waiting for first 
sync</li>
+   * store but not in the cache.</li>
+   * <li>Retrieve all pending segments and update the cache as needed.</li>
+   * <li>If schema caching is enabled, retrieve segment schemas and reset them
+   * in the {@link SegmentSchemaCache}.</li>
+   * <li>Clean up entries for datasources which have no segments in the cache 
anymore.</li>
+   * <li>Change the cache state to ready if it is leader and waiting for first 
sync.</li>
    * <li>Emit metrics</li>
    * </ul>
+   * </p>
    *
    * @return Time taken in milliseconds for the sync to finish
    */
@@ -532,13 +564,18 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
       retrieveAllUsedSegments(datasourceToSummary);
     } else {
       retrieveUsedSegmentIds(datasourceToSummary);
-      updateSegmentIdsInCache(datasourceToSummary, syncStartTime);
+      updateSegmentIdsInCache(datasourceToSummary, 
syncStartTime.minus(SYNC_BUFFER_DURATION));
       retrieveUsedSegmentPayloads(datasourceToSummary);
     }
 
     updateUsedSegmentPayloadsInCache(datasourceToSummary);
     retrieveAllPendingSegments(datasourceToSummary);
-    updatePendingSegmentsInCache(datasourceToSummary, syncStartTime);
+    updatePendingSegmentsInCache(datasourceToSummary, 
syncStartTime.minus(SYNC_BUFFER_DURATION));
+
+    if (useSchemaCache) {
+      retrieveAndResetUsedSegmentSchemas(datasourceToSummary);
+    }
+
     markCacheSynced(syncStartTime);
 
     syncFinishTime.set(DateTimes.nowUtc());
@@ -642,6 +679,16 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
     emitMetric(Metric.RETRIEVE_SEGMENT_IDS_DURATION_MILLIS, 
retrieveDuration.millisElapsed());
   }
 
+  private <T> T query(Function<SqlSegmentsMetadataQuery, T> sqlFunction)
+  {
+    return inReadOnlyTransaction(
+        (handle, status) -> sqlFunction.apply(
+            SqlSegmentsMetadataQuery
+                .forHandle(handle, connector, tablesConfig, jsonMapper)
+        )
+    );
+  }
+
   private <T> T inReadOnlyTransaction(TransactionCallback<T> callback)
   {
     return connector.retryReadOnlyTransaction(callback, SQL_QUIET_RETRIES, 
SQL_MAX_RETRIES);
@@ -668,7 +715,7 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
           CloseableIterator<DataSegmentPlus> iterator =
               SqlSegmentsMetadataQuery
                   .forHandle(handle, connector, tablesConfig, jsonMapper)
-                  .retrieveSegmentsByIdIterator(dataSource, 
segmentIdsToRefresh, false)
+                  .retrieveSegmentsByIdIterator(dataSource, 
segmentIdsToRefresh, useSchemaCache)
       ) {
         iterator.forEachRemaining(summary.usedSegments::add);
         return 0;
@@ -679,7 +726,8 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
   /**
    * Updates the cache of each datasource with the segment IDs fetched from the
    * metadata store in {@link #retrieveUsedSegmentIds}. The update done on each
-   * datasource cache is atomic.
+   * datasource cache is atomic. Also identifies the segment IDs which have 
been
+   * updated in the metadata store and need to be refreshed in the cache.
    */
   private void updateSegmentIdsInCache(
       Map<String, DatasourceSegmentSummary> datasourceToSummary,
@@ -734,10 +782,20 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
   )
   {
     final Stopwatch retrieveDuration = Stopwatch.createStarted();
-    final String sql = StringUtils.format(
-        "SELECT id, payload, created_date, used_status_last_updated FROM %s 
WHERE used = true",
-        tablesConfig.getSegmentsTable()
-    );
+    final String sql;
+    if (useSchemaCache) {
+      sql = StringUtils.format(
+          "SELECT id, payload, created_date, used_status_last_updated, 
schema_fingerprint, num_rows"
+          + " FROM %s WHERE used = true",
+          tablesConfig.getSegmentsTable()
+      );
+    } else {
+      sql = StringUtils.format(
+          "SELECT id, payload, created_date, used_status_last_updated"
+          + " FROM %s WHERE used = true",
+          tablesConfig.getSegmentsTable()
+      );
+    }
 
     final int numSkippedSegments = inReadOnlyTransaction((handle, status) -> {
       try (
@@ -870,11 +928,139 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
     }
   }
 
+  /**
+   * Retrieves required used segment schemas from the metadata store and resets
+   * them in the {@link SegmentSchemaCache}. If this is the first sync, all 
used
+   * schemas are retrieved from the metadata store. If this is a delta sync,
+   * first only the fingerprints of all used schemas are retrieved. Payloads 
are
+   * then fetched for only the fingerprints which are not present in the cache.
+   */
+  private void retrieveAndResetUsedSegmentSchemas(
+      Map<String, DatasourceSegmentSummary> datasourceToSummary
+  )
+  {
+    final Stopwatch schemaSyncDuration = Stopwatch.createStarted();
+
+    // Reset the SegmentSchemaCache with latest schemas and metadata
+    final Map<String, SchemaPayload> schemaFingerprintToPayload;
+    if (syncFinishTime.get() == null) {
+      schemaFingerprintToPayload = 
buildSchemaFingerprintToPayloadMapForFullSync();
+    } else {
+      schemaFingerprintToPayload = 
buildSchemaFingerprintToPayloadMapForDeltaSync();
+    }
+
+    segmentSchemaCache.resetSchemaForPublishedSegments(
+        buildSegmentIdToMetadataMapForSync(datasourceToSummary),
+        schemaFingerprintToPayload
+    );
+
+    // Emit metrics for the current contents of the cache
+    segmentSchemaCache.getStats().forEach(this::emitMetric);
+    emitMetric(Metric.RETRIEVE_SEGMENT_SCHEMAS_DURATION_MILLIS, 
schemaSyncDuration.millisElapsed());
+  }
+
+  /**
+   * Retrieves all used segment schemas from the metadata store and builds a
+   * fresh map from schema fingerprint to payload.
+   */
+  private Map<String, SchemaPayload> 
buildSchemaFingerprintToPayloadMapForFullSync()
+  {
+    final List<SegmentSchemaRecord> records = query(
+        SqlSegmentsMetadataQuery::retrieveAllUsedSegmentSchemas
+    );
+
+    return records.stream().collect(
+        Collectors.toMap(
+            SegmentSchemaRecord::getFingerprint,
+            SegmentSchemaRecord::getPayload
+        )
+    );
+  }
+
+  /**
+   * Retrieves segment schemas from the metadata store if they are not present
+   * in the cache or have been recently updated in the metadata store. These
+   * segment schemas along with those already present in the cache are used to
+   * build a complete udpated map from schema fingerprint to payload.
+   *
+   * @return Complete updated map from schema fingerprint to payload for all
+   * used segment schemas currently persisted in the metadata store.
+   */
+  private Map<String, SchemaPayload> 
buildSchemaFingerprintToPayloadMapForDeltaSync()
+  {
+    // Identify fingerprints in the cache and in the metadata store
+    final Map<String, SchemaPayload> schemaFingerprintToPayload = new 
HashMap<>(
+        segmentSchemaCache.getPublishedSchemaPayloadMap()
+    );
+    final Set<String> cachedFingerprints = 
Set.copyOf(schemaFingerprintToPayload.keySet());
+    final Set<String> persistedFingerprints = query(
+        SqlSegmentsMetadataQuery::retrieveAllUsedSegmentSchemaFingerprints
+    );
+
+    // Remove entry for schemas that have been deleted from the metadata store
+    final Set<String> deletedFingerprints = 
Sets.difference(cachedFingerprints, persistedFingerprints);
+    deletedFingerprints.forEach(schemaFingerprintToPayload::remove);
+
+    // Retrieve and add entry for schemas that have been added to the metadata 
store
+    final Set<String> addedFingerprints = 
Sets.difference(persistedFingerprints, cachedFingerprints);
+    final List<SegmentSchemaRecord> addedSegmentSchemaRecords = query(
+        sql -> sql.retrieveUsedSegmentSchemasForFingerprints(addedFingerprints)
+    );
+    if (addedSegmentSchemaRecords.size() < addedFingerprints.size()) {
+      emitMetric(Metric.SKIPPED_SEGMENT_SCHEMAS, addedFingerprints.size() - 
addedSegmentSchemaRecords.size());
+    }
+    addedSegmentSchemaRecords.forEach(
+        schema -> schemaFingerprintToPayload.put(schema.getFingerprint(), 
schema.getPayload())
+    );
+
+    return schemaFingerprintToPayload;
+  }
+
+  /**
+   * Builds a map from {@link SegmentId} to {@link SegmentMetadata} for all 
used
+   * segments currently present in the metadata store based on the current 
sync.
+   */
+  private Map<SegmentId, SegmentMetadata> buildSegmentIdToMetadataMapForSync(
+      Map<String, DatasourceSegmentSummary> datasourceToSummary
+  )
+  {
+    final Map<SegmentId, SegmentMetadata> cachedSegmentIdToMetadata =
+        segmentSchemaCache.getPublishedSegmentMetadataMap();
+
+    final Map<SegmentId, SegmentMetadata> syncedSegmentIdToMetadataMap = new 
HashMap<>(
+        cachedSegmentIdToMetadata
+    );
+
+    // Remove entry for segments not present in datasource cache (now synced 
with the metadata store)
+    cachedSegmentIdToMetadata.keySet().forEach(segmentId -> {
+      final DataSegment cachedSegment =
+          
getCacheForDatasource(segmentId.getDataSource()).findUsedSegment(segmentId);
+      if (cachedSegment == null) {
+        syncedSegmentIdToMetadataMap.remove(segmentId);
+      }
+    });
+
+    // Add entry for segments that have been added to the datasource cache
+    datasourceToSummary.values().forEach(summary -> {
+      summary.usedSegments.forEach(segment -> {
+        if (segment.getNumRows() != null && segment.getSchemaFingerprint() != 
null) {
+          syncedSegmentIdToMetadataMap.put(
+              segment.getDataSegment().getId(),
+              new SegmentMetadata(segment.getNumRows(), 
segment.getSchemaFingerprint())
+          );
+        }
+      });
+    });
+
+    return syncedSegmentIdToMetadataMap;
+  }
+
   /**
    * Tries to parse the fields of the result set into a {@link 
DataSegmentPlus}.
    *
    * @return null if an error occurred while parsing the result
    */
+  @Nullable
   private DataSegmentPlus mapToSegmentPlus(ResultSet resultSet)
   {
     String segmentId = null;
@@ -885,8 +1071,8 @@ public class HeapMemorySegmentMetadataCache implements 
SegmentMetadataCache
           DateTimes.of(resultSet.getString(3)),
           
SqlSegmentsMetadataQuery.nullAndEmptySafeDate(resultSet.getString(4)),
           true,
-          null,
-          null,
+          useSchemaCache ? resultSet.getString(5) : null,
+          useSchemaCache ? (Long) resultSet.getObject(6) : null,
           null
       );
     }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java 
b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java
index 6ce9e245614..c8e87ca4d55 100644
--- a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java
+++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java
@@ -105,6 +105,11 @@ public class Metric
    */
   public static final String RETRIEVE_PENDING_SEGMENTS_DURATION_MILLIS = 
METRIC_NAME_PREFIX + "fetchPending/time";
 
+  /**
+   * Time taken in milliseconds to fetch all segment schemas from the metadata 
store.
+   */
+  public static final String RETRIEVE_SEGMENT_SCHEMAS_DURATION_MILLIS = 
METRIC_NAME_PREFIX + "fetchSchemas/time";
+
   /**
    * Time taken to update the datasource snapshot in the cache.
    */
@@ -148,6 +153,11 @@ public class Metric
    */
   public static final String SKIPPED_SEGMENTS = METRIC_NAME_PREFIX + "skipped";
 
+  /**
+   * Number of unparseable segment schema records skipped while refreshing the 
cache.
+   */
+  public static final String SKIPPED_SEGMENT_SCHEMAS = METRIC_NAME_PREFIX + 
"schema/skipped";
+
   /**
    * Number of unparseable pending segment records skipped while refreshing 
the cache.
    */
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentSchemaRecord.java
 
b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentSchemaRecord.java
new file mode 100644
index 00000000000..b814f1a7fb8
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentSchemaRecord.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata.segment.cache;
+
+import org.apache.druid.segment.SchemaPayload;
+
+/**
+ * Represents a single record in the druid_segmentSchemas table.
+ */
+public class SegmentSchemaRecord
+{
+  private final String fingerprint;
+  private final SchemaPayload payload;
+
+  public SegmentSchemaRecord(String fingerprint, SchemaPayload payload)
+  {
+    this.fingerprint = fingerprint;
+    this.payload = payload;
+  }
+
+  public String getFingerprint()
+  {
+    return fingerprint;
+  }
+
+  public SchemaPayload getPayload()
+  {
+    return payload;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/NoopSegmentSchemaCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/NoopSegmentSchemaCache.java
new file mode 100644
index 00000000000..c58c1f97209
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/NoopSegmentSchemaCache.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.metadata;
+
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SchemaPayloadPlus;
+import org.apache.druid.segment.SegmentMetadata;
+import org.apache.druid.timeline.SegmentId;
+
+import java.util.Map;
+
+/**
+ * No-op implementation of {@link SegmentSchemaCache} that always returns false
+ * for {@link #isEnabled()} and {@link #isInitialized()}.
+ */
+public class NoopSegmentSchemaCache extends SegmentSchemaCache
+{
+  @Override
+  public boolean isEnabled()
+  {
+    return false;
+  }
+
+  @Override
+  public void setInitialized()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void onLeaderStop()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isInitialized()
+  {
+    return false;
+  }
+
+  @Override
+  public void awaitInitialization() throws InterruptedException
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void resetSchemaForPublishedSegments(
+      Map<SegmentId, SegmentMetadata> usedSegmentIdToMetadata,
+      Map<String, SchemaPayload> schemaFingerprintToPayload
+  )
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void addRealtimeSegmentSchema(SegmentId segmentId, SchemaPayloadPlus 
schema)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void addSchemaPendingBackfill(SegmentId segmentId, SchemaPayloadPlus 
schema)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void markSchemaPersisted(SegmentId segmentId)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<SegmentId, SegmentMetadata> getPublishedSegmentMetadataMap()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isSchemaCached(SegmentId segmentId)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<String, SchemaPayload> getPublishedSchemaPayloadMap()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  SchemaPayloadPlus getTemporaryPublishedMetadataQueryResults(SegmentId id)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void segmentRemoved(SegmentId segmentId)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void realtimeSegmentRemoved(SegmentId segmentId)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<String, Integer> getStats()
+  {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
index 9a6830225dc..79276c70a92 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java
@@ -94,6 +94,15 @@ public class SegmentSchemaCache
    */
   private final AtomicInteger cacheMissCount = new AtomicInteger(0);
 
+  /**
+   * @return true if schema caching is enabled.
+   */
+  public boolean isEnabled()
+  {
+    // Always return true since this implementation is bound only when caching 
is enabled
+    return true;
+  }
+
   public void setInitialized()
   {
     if (!isInitialized()) {
@@ -224,9 +233,9 @@ public class SegmentSchemaCache
     }
 
     // segment schema has been polled from the DB
-    SegmentMetadata segmentMetadata = getSegmentMetadataMap().get(segmentId);
+    SegmentMetadata segmentMetadata = 
getPublishedSegmentMetadataMap().get(segmentId);
     if (segmentMetadata != null) {
-      SchemaPayload schemaPayload = 
getSchemaPayloadMap().get(segmentMetadata.getSchemaFingerprint());
+      SchemaPayload schemaPayload = 
getPublishedSchemaPayloadMap().get(segmentMetadata.getSchemaFingerprint());
       if (schemaPayload != null) {
         return Optional.of(
             new SchemaPayloadPlus(schemaPayload, segmentMetadata.getNumRows())
@@ -251,19 +260,27 @@ public class SegmentSchemaCache
 
   private boolean isPublishedSegmentSchemaCached(SegmentId segmentId)
   {
-    SegmentMetadata segmentMetadata = getSegmentMetadataMap().get(segmentId);
+    SegmentMetadata segmentMetadata = 
getPublishedSegmentMetadataMap().get(segmentId);
     if (segmentMetadata != null) {
-      return 
getSchemaPayloadMap().containsKey(segmentMetadata.getSchemaFingerprint());
+      return 
getPublishedSchemaPayloadMap().containsKey(segmentMetadata.getSchemaFingerprint());
     }
     return false;
   }
 
-  private Map<SegmentId, SegmentMetadata> getSegmentMetadataMap()
+  /**
+   * @return Immutable map from segment ID to {@link SegmentMetadata} for all
+   * published used segments currently present in this cache.
+   */
+  public Map<SegmentId, SegmentMetadata> getPublishedSegmentMetadataMap()
   {
     return publishedSegmentSchemas.get().segmentIdToMetadata;
   }
 
-  private Map<String, SchemaPayload> getSchemaPayloadMap()
+  /**
+   * @return Immutable map from schema fingerprint to {@link SchemaPayload} for
+   * all schema fingerprints currently present in this cache.
+   */
+  public Map<String, SchemaPayload> getPublishedSchemaPayloadMap()
   {
     return publishedSegmentSchemas.get().schemaFingerprintToPayload;
   }
@@ -298,8 +315,8 @@ public class SegmentSchemaCache
     return Map.of(
         Metric.CACHE_MISSES, cacheMissCount.getAndSet(0),
         Metric.REALTIME_SEGMENT_SCHEMAS, realtimeSegmentSchemas.size(),
-        Metric.USED_SEGMENT_SCHEMAS, getSegmentMetadataMap().size(),
-        Metric.USED_SEGMENT_SCHEMA_FINGERPRINTS, getSchemaPayloadMap().size(),
+        Metric.USED_SEGMENT_SCHEMAS, getPublishedSegmentMetadataMap().size(),
+        Metric.USED_SEGMENT_SCHEMA_FINGERPRINTS, 
getPublishedSchemaPayloadMap().size(),
         Metric.SCHEMAS_PENDING_BACKFILL, schemasPendingBackfill.size()
     );
   }
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java
index de4d85742a4..8c1ed5c59ae 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaManager.java
@@ -34,9 +34,9 @@ import org.apache.druid.metadata.SQLMetadataConnector;
 import org.apache.druid.segment.SchemaPayload;
 import org.apache.druid.segment.SchemaPayloadPlus;
 import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.PreparedBatch;
-import org.skife.jdbi.v2.TransactionCallback;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -154,7 +154,8 @@ public class SegmentSchemaManager
       final int version
   )
   {
-    connector.retryTransaction((TransactionCallback<Void>) (handle, status) -> 
{
+    connector.retryTransaction((handle, status) -> {
+      final DateTime updateTime = DateTimes.nowUtc();
       Map<String, SchemaPayload> schemaPayloadMap = new HashMap<>();
 
       for (SegmentSchemaMetadataPlus segmentSchema : segmentSchemas) {
@@ -163,8 +164,8 @@ public class SegmentSchemaManager
             segmentSchema.getSegmentSchemaMetadata().getSchemaPayload()
         );
       }
-      persistSegmentSchema(handle, dataSource, version, schemaPayloadMap);
-      updateSegmentWithSchemaInformation(handle, segmentSchemas);
+      persistSegmentSchema(handle, dataSource, version, schemaPayloadMap, 
updateTime);
+      updateSegmentWithSchemaInformation(handle, segmentSchemas, updateTime);
 
       return null;
     }, 1, 3);
@@ -177,7 +178,8 @@ public class SegmentSchemaManager
       final Handle handle,
       final String dataSource,
       final int version,
-      final Map<String, SchemaPayload> fingerprintSchemaPayloadMap
+      final Map<String, SchemaPayload> fingerprintSchemaPayloadMap,
+      final DateTime updateTime
   ) throws JsonProcessingException
   {
     if (fingerprintSchemaPayloadMap.isEmpty()) {
@@ -244,7 +246,7 @@ public class SegmentSchemaManager
     PreparedBatch schemaInsertBatch = handle.prepareBatch(insertSql);
     for (List<String> partition : partitionedFingerprints) {
       for (String fingerprint : partition) {
-        final String now = DateTimes.nowUtc().toString();
+        final String now = updateTime.toString();
         schemaInsertBatch.add()
                          .bind("created_date", now)
                          .bind("datasource", dataSource)
@@ -280,7 +282,8 @@ public class SegmentSchemaManager
    */
   public void updateSegmentWithSchemaInformation(
       final Handle handle,
-      final List<SegmentSchemaMetadataPlus> batch
+      final List<SegmentSchemaMetadataPlus> batch,
+      final DateTime updateTime
   )
   {
     log.debug("Updating segment with schemaFingerprint and numRows 
information: [%s].", batch);
@@ -288,7 +291,11 @@ public class SegmentSchemaManager
     // update schemaFingerprint and numRows in segments table
     String updateSql =
         StringUtils.format(
-            "UPDATE %s SET schema_fingerprint = :schema_fingerprint, num_rows 
= :num_rows WHERE id = :id",
+            "UPDATE %s"
+            + " SET schema_fingerprint = :schema_fingerprint,"
+            + " num_rows = :num_rows,"
+            + " used_status_last_updated = :last_updated"
+            + " WHERE id = :id",
             dbTables.getSegmentsTable()
         );
 
@@ -304,7 +311,8 @@ public class SegmentSchemaManager
         segmentUpdateBatch.add()
                           .bind("id", segmentSchema.getSegmentId().toString())
                           .bind("schema_fingerprint", fingerprint)
-                          .bind("num_rows", 
segmentSchema.getSegmentSchemaMetadata().getNumRows());
+                          .bind("num_rows", 
segmentSchema.getSegmentSchemaMetadata().getNumRows())
+                          .bind("last_updated", updateTime.toString());
       }
 
       final int[] affectedRows = segmentUpdateBatch.execute();
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java
index 7007607a2b0..2406af6f12b 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorMarkUsedTest.java
@@ -20,7 +20,6 @@
 package org.apache.druid.metadata;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -79,7 +78,6 @@ public class IndexerSQLMetadataStorageCoordinatorMarkUsedTest 
extends IndexerSql
         derbyConnectorRule.metadataTablesConfigSupplier().get(),
         derbyConnector,
         new TestDruidLeaderSelector(),
-        Set.of(NodeRole.OVERLORD),
         NoopSegmentMetadataCache.instance(),
         NoopServiceEmitter.instance()
     )
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
index 5f5d0db9f58..48656aa93e9 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
@@ -29,12 +29,14 @@ import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
+import 
org.apache.druid.metadata.segment.SqlSegmentMetadataReadOnlyTransactionFactory;
 import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
 import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
 import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
 import org.apache.druid.segment.TestDataSource;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
 import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
 import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
 import 
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
@@ -101,6 +103,7 @@ public class 
IndexerSQLMetadataStorageCoordinatorReadOnlyTest extends IndexerSql
         mapper,
         () -> new SegmentsMetadataManagerConfig(null, cacheMode),
         derbyConnectorRule.metadataTablesConfigSupplier(),
+        new NoopSegmentSchemaCache(),
         derbyConnector,
         (corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(
             nameFormat,
@@ -151,15 +154,23 @@ public class 
IndexerSQLMetadataStorageCoordinatorReadOnlyTest extends IndexerSql
       NodeRole nodeRole
   )
   {
-    final SegmentMetadataTransactionFactory transactionFactory = new 
SqlSegmentMetadataTransactionFactory(
-        mapper,
-        derbyConnectorRule.metadataTablesConfigSupplier().get(),
-        derbyConnector,
-        leaderSelector,
-        Set.of(nodeRole),
-        segmentMetadataCache,
-        emitter
-    );
+    final SegmentMetadataTransactionFactory transactionFactory;
+    if (nodeRole.equals(NodeRole.COORDINATOR)) {
+      transactionFactory = new SqlSegmentMetadataReadOnlyTransactionFactory(
+          mapper,
+          derbyConnectorRule.metadataTablesConfigSupplier().get(),
+          derbyConnector
+      );
+    } else {
+      transactionFactory = new SqlSegmentMetadataTransactionFactory(
+          mapper,
+          derbyConnectorRule.metadataTablesConfigSupplier().get(),
+          derbyConnector,
+          leaderSelector,
+          segmentMetadataCache,
+          emitter
+      );
+    }
 
     return new IndexerSQLMetadataStorageCoordinator(
         transactionFactory,
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index d5572c5b058..9a1b7ce63ed 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.apache.druid.data.input.StringTuple;
-import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.error.ExceptionMatcher;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.ObjectMetadata;
@@ -45,6 +44,7 @@ import org.apache.druid.segment.SegmentSchemaMapping;
 import org.apache.druid.segment.TestDataSource;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.segment.metadata.FingerprintGenerator;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
 import org.apache.druid.segment.metadata.SegmentSchemaManager;
 import org.apache.druid.segment.metadata.SegmentSchemaTestUtils;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
@@ -153,6 +153,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
         mapper,
         () -> new SegmentsMetadataManagerConfig(null, cacheMode),
         derbyConnectorRule.metadataTablesConfigSupplier(),
+        new NoopSegmentSchemaCache(),
         derbyConnector,
         (corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(
             nameFormat,
@@ -177,7 +178,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
         derbyConnectorRule.metadataTablesConfigSupplier().get(),
         derbyConnector,
         leaderSelector,
-        Set.of(NodeRole.OVERLORD),
         segmentMetadataCache,
         emitter
     )
@@ -997,7 +997,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
   public void testRetrieveSegmentForId()
   {
     coordinator.commitSegments(Set.of(defaultSegment), null);
-    markAllSegmentsUnused(ImmutableSet.of(defaultSegment), DateTimes.nowUtc());
+    coordinator.markSegmentAsUnused(defaultSegment.getId());
     Assert.assertEquals(
         defaultSegment,
         coordinator.retrieveSegmentForId(defaultSegment.getId())
@@ -2613,8 +2613,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertEquals(1, identifier3.getShardSpec().getNumCorePartitions());
 
     // now drop the used segment previously loaded:
-    markAllSegmentsUnused(ImmutableSet.of(segment), DateTimes.nowUtc());
-    refreshCache();
+    coordinator.markSegmentAsUnused(segment.getId());
 
     // and final load, this reproduces an issue that could happen with 
multiple streaming appends,
     // followed by a reindex, followed by a drop, and more streaming data 
coming in for same interval
@@ -2784,8 +2783,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
 
     // 5) reverted compaction (by marking B_0 as unused)
     // Revert compaction a manual metadata update which is basically the 
following two steps:
-    markAllSegmentsUnused(ImmutableSet.of(compactedSegment), 
DateTimes.nowUtc()); // <- drop compacted segment
-    refreshCache();
+    coordinator.markSegmentAsUnused(compactedSegment.getId());
     //        pending: version = A, id = 0,1,2
     //                 version = B, id = 1
     //
@@ -3638,8 +3636,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertTrue(coordinator.commitSegments(dataSegments, new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)).containsAll(dataSegments));
 
     // Mark the tombstone as unused
-    markAllSegmentsUnused(tombstones, DateTimes.nowUtc());
-    refreshCache();
+    coordinator.markSegmentAsUnused(tombstoneSegment.getId());
 
     final Collection<DataSegment> allUsedSegments = 
coordinator.retrieveAllUsedSegments(
         TestDataSource.WIKI,
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
index a4eddec3d4e..d98e54c3ea5 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java
@@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.metadata.segment.SegmentMetadataTransaction;
@@ -99,7 +99,6 @@ public class 
IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends
         derbyConnectorRule.metadataTablesConfigSupplier().get(),
         derbyConnector,
         new TestDruidLeaderSelector(),
-        Set.of(NodeRole.OVERLORD),
         NoopSegmentMetadataCache.instance(),
         NoopServiceEmitter.instance()
     );
@@ -449,7 +448,8 @@ public class 
IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends
           handle,
           "fooDataSource",
           CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
-          schemaPayloadMapToPerist
+          schemaPayloadMapToPerist,
+          DateTimes.nowUtc()
       );
       return null;
     });
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
index a5e30ae90a7..17a02f19a73 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java
@@ -50,6 +50,7 @@ import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.skife.jdbi.v2.PreparedBatch;
+import org.skife.jdbi.v2.PreparedBatchPart;
 import org.skife.jdbi.v2.ResultIterator;
 import org.skife.jdbi.v2.util.StringMapper;
 
@@ -578,11 +579,12 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
       );
     }
 
-    insertSegments(usedSegments, derbyConnectorRule, jsonMapper);
+    insertSegments(usedSegments, false, derbyConnectorRule, jsonMapper);
   }
 
   public static void insertSegments(
       Set<DataSegmentPlus> dataSegments,
+      boolean includeSchema,
       TestDerbyConnector.DerbyConnectorRule derbyConnectorRule,
       ObjectMapper jsonMapper
   )
@@ -590,23 +592,15 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
     final TestDerbyConnector connector = derbyConnectorRule.getConnector();
     final String table = 
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
 
+    final String sql = getSegmentInsertSql(includeSchema, table, connector);
     connector.retryWithHandle(
         handle -> {
-          PreparedBatch preparedBatch = handle.prepareBatch(
-              StringUtils.format(
-                  "INSERT INTO %1$s (id, dataSource, created_date, start, 
%2$send%2$s, partitioned, version,"
-                  + " used, payload, used_status_last_updated, 
upgraded_from_segment_id) "
-                  + "VALUES (:id, :dataSource, :created_date, :start, :end, 
:partitioned, :version,"
-                  + " :used, :payload, :used_status_last_updated, 
:upgraded_from_segment_id)",
-                  table,
-                  connector.getQuoteString()
-              )
-          );
+          PreparedBatch preparedBatch = handle.prepareBatch(sql);
           for (DataSegmentPlus segmentPlus : dataSegments) {
             final DataSegment segment = segmentPlus.getDataSegment();
             String id = segment.getId().toString();
-            preparedBatch.add()
-                         .bind("id", id)
+            final PreparedBatchPart segmentRecord = preparedBatch.add();
+            segmentRecord.bind("id", id)
                          .bind("dataSource", segment.getDataSource())
                          .bind("created_date", 
nullSafeString(segmentPlus.getCreatedDate()))
                          .bind("start", 
segment.getInterval().getStart().toString())
@@ -617,6 +611,11 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
                          .bind("payload", 
jsonMapper.writeValueAsBytes(segment))
                          .bind("used_status_last_updated", 
nullSafeString(segmentPlus.getUsedStatusLastUpdatedDate()))
                          .bind("upgraded_from_segment_id", 
segmentPlus.getUpgradedFromSegmentId());
+
+            if (includeSchema) {
+              segmentRecord.bind("num_rows", segmentPlus.getNumRows())
+                           .bind("schema_fingerprint", 
segmentPlus.getSchemaFingerprint());
+            }
           }
 
           final int[] affectedRows = preparedBatch.execute();
@@ -629,6 +628,31 @@ public class IndexerSqlMetadataStorageCoordinatorTestBase
     );
   }
 
+  private static String getSegmentInsertSql(boolean includeSchema, String 
table, TestDerbyConnector connector)
+  {
+    final String sql;
+    if (includeSchema) {
+      sql = StringUtils.format(
+          "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, 
partitioned, version,"
+          + " used, payload, used_status_last_updated, 
upgraded_from_segment_id, num_rows, schema_fingerprint) "
+          + "VALUES (:id, :dataSource, :created_date, :start, :end, 
:partitioned, :version,"
+          + " :used, :payload, :used_status_last_updated, 
:upgraded_from_segment_id, :num_rows, :schema_fingerprint)",
+          table,
+          connector.getQuoteString()
+      );
+    } else {
+      sql = StringUtils.format(
+          "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, 
partitioned, version,"
+          + " used, payload, used_status_last_updated, 
upgraded_from_segment_id) "
+          + "VALUES (:id, :dataSource, :created_date, :start, :end, 
:partitioned, :version,"
+          + " :used, :payload, :used_status_last_updated, 
:upgraded_from_segment_id)",
+          table,
+          connector.getQuoteString()
+      );
+    }
+    return sql;
+  }
+
   @Nullable
   private static String nullSafeString(DateTime date)
   {
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
index 7e2aff60900..fb310430ac3 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
@@ -57,7 +57,7 @@ public class SqlSegmentsMetadataManagerProviderTest
         connector,
         lifecycle,
         segmentSchemaCache,
-        CentralizedDatasourceSchemaConfig.create(),
+        CentralizedDatasourceSchemaConfig::create,
         NoopServiceEmitter.instance()
     );
     SegmentsMetadataManager manager = provider.get();
diff --git 
a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
 
b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
index 2f5c18dcd79..e5554b931ee 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
@@ -21,7 +21,6 @@ package org.apache.druid.metadata.segment;
 
 import com.google.common.base.Suppliers;
 import org.apache.druid.client.DataSourcesSnapshot;
-import org.apache.druid.error.ExceptionMatcher;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
@@ -31,16 +30,16 @@ import 
org.apache.druid.metadata.SqlSegmentsMetadataManagerTestBase;
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
 import org.apache.druid.metadata.segment.cache.Metric;
-import org.apache.druid.metadata.segment.cache.NoopSegmentMetadataCache;
 import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
 import org.apache.druid.segment.TestDataSource;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
+import org.apache.druid.segment.metadata.SegmentSchemaCache;
 import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
 import 
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
 import org.apache.druid.timeline.DataSegment;
 import org.assertj.core.util.Sets;
-import org.hamcrest.MatcherAssert;
 import org.joda.time.DateTime;
 import org.joda.time.Period;
 import org.junit.After;
@@ -82,7 +81,8 @@ public class SqlSegmentsMetadataManagerV2Test extends 
SqlSegmentsMetadataManager
   }
 
   private void initManager(
-      SegmentMetadataCache.UsageMode cacheMode
+      SegmentMetadataCache.UsageMode cacheMode,
+      boolean useSchemaCache
   )
   {
     segmentMetadataCacheExec = new BlockingExecutorService("test");
@@ -90,6 +90,7 @@ public class SqlSegmentsMetadataManagerV2Test extends 
SqlSegmentsMetadataManager
         jsonMapper,
         Suppliers.ofInstance(new 
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
         Suppliers.ofInstance(storageConfig),
+        useSchemaCache ? new SegmentSchemaCache() : new 
NoopSegmentSchemaCache(),
         connector,
         (poolSize, name) -> new WrappingScheduledExecutorService(name, 
segmentMetadataCacheExec, false),
         emitter
@@ -103,7 +104,7 @@ public class SqlSegmentsMetadataManagerV2Test extends 
SqlSegmentsMetadataManager
         connector,
         Suppliers.ofInstance(config),
         derbyConnectorRule.metadataTablesConfigSupplier(),
-        CentralizedDatasourceSchemaConfig.create(),
+        CentralizedDatasourceSchemaConfig::create,
         emitter,
         jsonMapper
     );
@@ -132,7 +133,7 @@ public class SqlSegmentsMetadataManagerV2Test extends 
SqlSegmentsMetadataManager
   @Test
   public void test_manager_usesCachedSegments_ifCacheIsEnabled()
   {
-    initManager(SegmentMetadataCache.UsageMode.ALWAYS);
+    initManager(SegmentMetadataCache.UsageMode.ALWAYS, false);
 
     manager.startPollingDatabasePeriodically();
     Assert.assertTrue(manager.isPollingDatabasePeriodically());
@@ -146,13 +147,16 @@ public class SqlSegmentsMetadataManagerV2Test extends 
SqlSegmentsMetadataManager
 
     emitter.verifyNotEmitted("segment/poll/time");
     emitter.verifyNotEmitted("segment/pollWithSchema/time");
+    emitter.verifyNotEmitted(Metric.RETRIEVE_SEGMENT_SCHEMAS_DURATION_MILLIS);
+    emitter.verifyNotEmitted("segment/schemaCache/used/count");
+
     emitter.verifyEmitted(Metric.SYNC_DURATION_MILLIS, 2);
   }
 
   @Test
   public void test_manager_pollsSegments_ifCacheIsDisabled()
   {
-    initManager(SegmentMetadataCache.UsageMode.NEVER);
+    initManager(SegmentMetadataCache.UsageMode.NEVER, false);
 
     manager.startPollingDatabasePeriodically();
     Assert.assertTrue(manager.isPollingDatabasePeriodically());
@@ -167,34 +171,25 @@ public class SqlSegmentsMetadataManagerV2Test extends 
SqlSegmentsMetadataManager
   }
 
   @Test
-  public void test_manager_throwsException_ifBothCacheAndSchemaAreEnabled()
+  public void 
test_manager_usesCachedSegmentsAndSchemas_ifBothCacheAndSchemaAreEnabled()
   {
-    MatcherAssert.assertThat(
-        Assert.assertThrows(
-            IllegalArgumentException.class,
-            () -> new SqlSegmentsMetadataManagerV2(
-                new NoopSegmentMetadataCache() {
-                  @Override
-                  public boolean isEnabled()
-                  {
-                    return true;
-                  }
-                },
-                segmentSchemaCache,
-                connector,
-                Suppliers.ofInstance(config),
-                derbyConnectorRule.metadataTablesConfigSupplier(),
-                CentralizedDatasourceSchemaConfig.enabled(true),
-                emitter,
-                jsonMapper
-            )
-        ),
-        ExceptionMatcher.of(IllegalArgumentException.class).expectMessageIs(
-            "Segment metadata incremental 
cache['druid.manager.segments.useIncrementalCache']"
-            + " and segment schema 
cache['druid.centralizedDatasourceSchema.enabled']"
-            + " must not be enabled together."
-        )
-    );
+    initManager(SegmentMetadataCache.UsageMode.ALWAYS, true);
+
+    manager.startPollingDatabasePeriodically();
+    Assert.assertTrue(manager.isPollingDatabasePeriodically());
+
+    syncSegmentMetadataCache();
+    verifyDatasourceSnapshot();
+
+    // isPolling returns true even after stop since cache is still polling the 
metadata store
+    manager.stopPollingDatabasePeriodically();
+    Assert.assertTrue(manager.isPollingDatabasePeriodically());
+
+    emitter.verifyNotEmitted("segment/poll/time");
+    emitter.verifyNotEmitted("segment/pollWithSchema/time");
+    emitter.verifyEmitted(Metric.SYNC_DURATION_MILLIS, 2);
+    emitter.verifyEmitted(Metric.RETRIEVE_SEGMENT_SCHEMAS_DURATION_MILLIS, 2);
+    emitter.verifyEmitted("segment/schemaCache/used/count", 2);
   }
 
   private void verifyDatasourceSnapshot()
diff --git 
a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
 
b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
index 48983284071..2a5940b0495 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
@@ -32,8 +32,16 @@ import 
org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase;
 import org.apache.druid.metadata.PendingSegmentRecord;
 import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
 import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.segment.SchemaPayload;
+import org.apache.druid.segment.SegmentMetadata;
 import org.apache.druid.segment.TestDataSource;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.metadata.FingerprintGenerator;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
+import org.apache.druid.segment.metadata.SegmentSchemaCache;
+import org.apache.druid.segment.metadata.SegmentSchemaTestUtils;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
@@ -52,13 +60,14 @@ import org.junit.Test;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class HeapMemorySegmentMetadataCacheTest
 {
   @Rule
   public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule
-      = new TestDerbyConnector.DerbyConnectorRule();
+      = new 
TestDerbyConnector.DerbyConnectorRule(CentralizedDatasourceSchemaConfig.enabled(true));
 
   private BlockingExecutorService pollExecutor;
   private ScheduledExecutorFactory executorFactory;
@@ -66,6 +75,8 @@ public class HeapMemorySegmentMetadataCacheTest
   private StubServiceEmitter serviceEmitter;
 
   private HeapMemorySegmentMetadataCache cache;
+  private SegmentSchemaCache schemaCache;
+  private SegmentSchemaTestUtils schemaTestUtils;
 
   @Before
   public void setup()
@@ -76,8 +87,10 @@ public class HeapMemorySegmentMetadataCacheTest
     serviceEmitter = new StubServiceEmitter();
 
     derbyConnector.createSegmentTable();
+    derbyConnector.createSegmentSchemasTable();
     derbyConnector.createPendingSegmentsTable();
 
+    schemaTestUtils = new SegmentSchemaTestUtils(derbyConnectorRule, 
derbyConnector, TestHelper.JSON_MAPPER);
     EmittingLogger.registerEmitter(serviceEmitter);
   }
 
@@ -90,26 +103,41 @@ public class HeapMemorySegmentMetadataCacheTest
     }
   }
 
+  private void setupTargetWithCaching(SegmentMetadataCache.UsageMode cacheMode)
+  {
+    setupTargetWithCaching(cacheMode, false);
+  }
+
   /**
    * Creates the target {@link #cache} to be tested in the current test.
    */
-  private void setupTargetWithCaching(SegmentMetadataCache.UsageMode cacheMode)
+  private void setupTargetWithCaching(SegmentMetadataCache.UsageMode 
cacheMode, boolean useSchemaCache)
   {
     if (cache != null) {
       throw new ISE("Test target has already been initialized with 
caching[%s]", cache.isEnabled());
     }
     final SegmentsMetadataManagerConfig metadataManagerConfig
         = new SegmentsMetadataManagerConfig(null, cacheMode);
+    schemaCache = useSchemaCache ? new SegmentSchemaCache() : new 
NoopSegmentSchemaCache();
     cache = new HeapMemorySegmentMetadataCache(
         TestHelper.JSON_MAPPER,
         () -> metadataManagerConfig,
         derbyConnectorRule.metadataTablesConfigSupplier(),
+        schemaCache,
         derbyConnector,
         executorFactory,
         serviceEmitter
     );
   }
 
+  private void setupAndSyncCacheWithSchema()
+  {
+    setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS, true);
+    cache.start();
+    cache.becomeLeader();
+    syncCacheAfterBecomingLeader();
+  }
+
   private void setupAndSyncCache()
   {
     setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS);
@@ -692,10 +720,141 @@ public class HeapMemorySegmentMetadataCacheTest
     serviceEmitter.verifyValue(Metric.DELETED_DATASOURCES, 1L);
   }
 
+  @Test
+  public void test_sync_addsUsedSegmentSchema_ifNotPresentInCache()
+  {
+    setupAndSyncCacheWithSchema();
+
+    Assert.assertTrue(
+        schemaCache.getPublishedSchemaPayloadMap().isEmpty()
+    );
+
+    final SchemaPayload payload = new SchemaPayload(
+        RowSignature.builder().add("col1", null).build()
+    );
+    final String fingerprint = getSchemaFingerprint(payload);
+    schemaTestUtils.insertSegmentSchema(
+        TestDataSource.WIKI,
+        Map.of(fingerprint, payload),
+        Set.of(fingerprint)
+    );
+
+    syncCache();
+    serviceEmitter.verifyValue("segment/schemaCache/usedFingerprint/count", 
1L);
+
+    Assert.assertEquals(
+        Map.of(fingerprint, payload),
+        schemaCache.getPublishedSchemaPayloadMap()
+    );
+  }
+
+  @Test
+  public void test_sync_removesUsedSegmentSchema_ifNotPresentInMetadataStore()
+  {
+    setupAndSyncCacheWithSchema();
+
+    final SchemaPayload payload = new SchemaPayload(
+        RowSignature.builder().add("col1", null).build()
+    );
+    final String fingerprint = getSchemaFingerprint(payload);
+
+    schemaCache.resetSchemaForPublishedSegments(
+        Map.of(),
+        Map.of(fingerprint, payload)
+    );
+    Assert.assertEquals(
+        Map.of(fingerprint, payload),
+        schemaCache.getPublishedSchemaPayloadMap()
+    );
+
+    syncCache();
+    serviceEmitter.verifyValue("segment/schemaCache/usedFingerprint/count", 
0L);
+
+    Assert.assertTrue(
+        schemaCache.getPublishedSchemaPayloadMap().isEmpty()
+    );
+  }
+
+  @Test
+  public void test_sync_addsUsedSegmentMetadata_ifNotPresentInCache()
+  {
+    setupAndSyncCacheWithSchema();
+
+    Assert.assertTrue(
+        schemaCache.getPublishedSegmentMetadataMap().isEmpty()
+    );
+
+    final SchemaPayload payload = new SchemaPayload(
+        RowSignature.builder().add("col1", null).build()
+    );
+    final String fingerprint = getSchemaFingerprint(payload);
+
+    final DataSegmentPlus usedSegmentPlus
+        = CreateDataSegments.ofDatasource(TestDataSource.WIKI)
+                            
.withNumRows(10L).withSchemaFingerprint(fingerprint)
+                            .updatedNow().markUsed().asPlus();
+    insertSegmentsInMetadataStoreWithSchema(usedSegmentPlus);
+
+    syncCache();
+    serviceEmitter.verifyValue(Metric.PERSISTED_USED_SEGMENTS, 1L);
+    serviceEmitter.verifyValue(Metric.CACHED_USED_SEGMENTS, 1L);
+    serviceEmitter.verifyValue(Metric.UPDATED_USED_SEGMENTS, 1L);
+    serviceEmitter.verifyValue("segment/schemaCache/used/count", 1L);
+
+    Assert.assertEquals(
+        Map.of(usedSegmentPlus.getDataSegment().getId(), new 
SegmentMetadata(10L, fingerprint)),
+        schemaCache.getPublishedSegmentMetadataMap()
+    );
+  }
+
+  @Test
+  public void 
test_sync_removesUsedSegmentMetadata_ifNotPresentInMetadataStore()
+  {
+    setupAndSyncCacheWithSchema();
+
+    final SchemaPayload payload = new SchemaPayload(
+        RowSignature.builder().add("col1", null).build()
+    );
+    final String fingerprint = getSchemaFingerprint(payload);
+    final SegmentId segmentId = SegmentId.dummy(TestDataSource.WIKI);
+    final SegmentMetadata metadata = new SegmentMetadata(10L, fingerprint);
+
+    schemaCache.resetSchemaForPublishedSegments(
+        Map.of(segmentId, metadata),
+        Map.of()
+    );
+    Assert.assertEquals(
+        Map.of(segmentId, metadata),
+        schemaCache.getPublishedSegmentMetadataMap()
+    );
+
+    syncCache();
+    serviceEmitter.verifyValue("segment/schemaCache/used/count", 0L);
+
+    Assert.assertTrue(
+        schemaCache.getPublishedSegmentMetadataMap().isEmpty()
+    );
+  }
+
+  private static String getSchemaFingerprint(SchemaPayload payload)
+  {
+    return new 
FingerprintGenerator(TestHelper.JSON_MAPPER).generateFingerprint(
+        payload,
+        TestDataSource.WIKI,
+        CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
+    );
+  }
+
   private void insertSegmentsInMetadataStore(Set<DataSegmentPlus> segments)
   {
     IndexerSqlMetadataStorageCoordinatorTestBase
-        .insertSegments(segments, derbyConnectorRule, TestHelper.JSON_MAPPER);
+        .insertSegments(segments, false, derbyConnectorRule, 
TestHelper.JSON_MAPPER);
+  }
+
+  private void insertSegmentsInMetadataStoreWithSchema(DataSegmentPlus... 
segments)
+  {
+    IndexerSqlMetadataStorageCoordinatorTestBase
+        .insertSegments(Set.of(segments), true, derbyConnectorRule, 
TestHelper.JSON_MAPPER);
   }
 
   private void updateSegmentInMetadataStore(DataSegmentPlus segment)
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
index b5775409e69..ab3c6efcee1 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
@@ -61,6 +61,8 @@ public class CreateDataSegments
   private Boolean used;
   private DateTime lastUpdatedTime;
   private String upgradedFromSegmentId;
+  private String schemaFingerprint;
+  private Long numRows;
 
   public static CreateDataSegments ofDatasource(String datasource)
   {
@@ -109,6 +111,18 @@ public class CreateDataSegments
     return this;
   }
 
+  public CreateDataSegments withNumRows(Long numRows)
+  {
+    this.numRows = numRows;
+    return this;
+  }
+
+  public CreateDataSegments withSchemaFingerprint(String schemaFingerprint)
+  {
+    this.schemaFingerprint = schemaFingerprint;
+    return this;
+  }
+
   public CreateDataSegments markUnused()
   {
     this.used = false;
@@ -186,8 +200,8 @@ public class CreateDataSegments
         DateTimes.nowUtc(),
         lastUpdatedTime,
         used,
-        null,
-        null,
+        schemaFingerprint,
+        numRows,
         upgradedFromSegmentId
     );
   }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java
index c5e2012d02a..dae4d48a1bf 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java
@@ -229,7 +229,8 @@ public class KillUnreferencedSegmentSchemaTest
               handle,
               "foo",
               CentralizedDatasourceSchemaConfig.SCHEMA_VERSION,
-              Collections.singletonMap(fingerprint, schemaPayload)
+              Collections.singletonMap(fingerprint, schemaPayload),
+              DateTimes.nowUtc()
           );
           return null;
         }
@@ -304,7 +305,8 @@ public class KillUnreferencedSegmentSchemaTest
               handle,
               "foo",
               0,
-              Collections.singletonMap(fingerprintOldVersion, schemaPayload)
+              Collections.singletonMap(fingerprintOldVersion, schemaPayload),
+              DateTimes.nowUtc()
           );
           return null;
         }
@@ -316,7 +318,8 @@ public class KillUnreferencedSegmentSchemaTest
               handle,
               "foo",
               1,
-              Collections.singletonMap(fingerprintNewVersion, schemaPayload)
+              Collections.singletonMap(fingerprintNewVersion, schemaPayload),
+              DateTimes.nowUtc()
           );
           return null;
         }
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java 
b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 4adf29d6238..5a94d73a3e8 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -47,6 +47,8 @@ import org.apache.druid.guice.JsonConfigurator;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.MetadataConfigModule;
+import org.apache.druid.guice.MetadataManagerModule;
 import org.apache.druid.guice.QueryableModule;
 import org.apache.druid.guice.SegmentSchemaCacheModule;
 import org.apache.druid.guice.SupervisorCleanupModule;
@@ -61,12 +63,8 @@ import 
org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.metadata.MetadataRuleManager;
-import org.apache.druid.metadata.MetadataRuleManagerProvider;
 import org.apache.druid.metadata.MetadataStorage;
 import org.apache.druid.metadata.MetadataStorageProvider;
-import org.apache.druid.metadata.SegmentsMetadataManager;
-import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
 import org.apache.druid.query.lookup.LookupSerdeModule;
 import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
 import org.apache.druid.segment.metadata.SegmentMetadataCacheConfig;
@@ -74,7 +72,6 @@ import 
org.apache.druid.server.compaction.CompactionStatusTracker;
 import org.apache.druid.server.coordinator.CloneStatusManager;
 import org.apache.druid.server.coordinator.CoordinatorConfigManager;
 import org.apache.druid.server.coordinator.DruidCoordinator;
-import org.apache.druid.server.coordinator.MetadataManager;
 import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
 import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs;
 import org.apache.druid.server.coordinator.config.CoordinatorPeriodConfig;
@@ -145,7 +142,7 @@ public class CliCoordinator extends ServerRunnable
   {
     this.properties = properties;
     beOverlord = isOverlord(properties);
-    isSegmentSchemaCacheEnabled = isSegmentSchemaCacheEnabled(properties);
+    isSegmentSchemaCacheEnabled = 
MetadataConfigModule.isSegmentSchemaCacheEnabled(properties);
 
     if (beOverlord) {
       log.info("Coordinator is configured to act as Overlord as well (%s = 
true).", AS_OVERLORD_PROPERTY);
@@ -167,6 +164,7 @@ public class CliCoordinator extends ServerRunnable
     List<Module> modules = new ArrayList<>();
 
     modules.add(JettyHttpClientModule.global());
+    modules.add(new MetadataManagerModule());
 
     if (isSegmentSchemaCacheEnabled) {
       validateCentralizedDatasourceSchemaConfig(properties);
@@ -213,19 +211,9 @@ public class CliCoordinator extends ServerRunnable
               
binder.bind(DirectDruidClientFactory.class).toProvider(Providers.of(null));
             }
 
-            binder.bind(SegmentsMetadataManager.class)
-                  .toProvider(SegmentsMetadataManagerProvider.class)
-                  .in(ManageLifecycle.class);
-
-            binder.bind(MetadataRuleManager.class)
-                  .toProvider(MetadataRuleManagerProvider.class)
-                  .in(ManageLifecycle.class);
-
             
binder.bind(LookupCoordinatorManager.class).in(LazySingleton.class);
             binder.bind(CloneStatusManager.class).in(LazySingleton.class);
 
-            binder.bind(CoordinatorConfigManager.class);
-            binder.bind(MetadataManager.class);
             binder.bind(DruidCoordinator.class);
             binder.bind(CompactionStatusTracker.class).in(LazySingleton.class);
 
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java 
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 5cdf7fefd3e..95fa654674d 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -50,6 +50,7 @@ import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ListProvider;
 import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.guice.MetadataManagerModule;
 import org.apache.druid.guice.PolyBind;
 import org.apache.druid.guice.SupervisorModule;
 import org.apache.druid.guice.annotations.Json;
@@ -108,8 +109,6 @@ import 
org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManag
 import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.SegmentsMetadataManager;
-import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
 import org.apache.druid.metadata.input.InputSourceModule;
 import org.apache.druid.query.lookup.LookupSerdeModule;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
@@ -118,7 +117,6 @@ import 
org.apache.druid.segment.realtime.NoopChatHandlerProvider;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import 
org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
 import org.apache.druid.server.compaction.CompactionStatusTracker;
-import org.apache.druid.server.coordinator.CoordinatorConfigManager;
 import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
 import org.apache.druid.server.coordinator.DruidCompactionConfig;
 import org.apache.druid.server.http.RedirectFilter;
@@ -196,6 +194,7 @@ public class CliOverlord extends ServerRunnable
   protected List<? extends Module> getModules(final boolean standalone)
   {
     return ImmutableList.of(
+        standalone ? new MetadataManagerModule() : binder -> {},
         new Module()
         {
           @Override
@@ -211,10 +210,6 @@ public class CliOverlord extends ServerRunnable
               
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8290);
 
               
binder.bind(CompactionStatusTracker.class).in(LazySingleton.class);
-              binder.bind(SegmentsMetadataManager.class)
-                    .toProvider(SegmentsMetadataManagerProvider.class)
-                    .in(ManageLifecycle.class);
-              
binder.bind(CoordinatorConfigManager.class).in(LazySingleton.class);
             }
 
             JsonConfigProvider.bind(binder, "druid.coordinator.asOverlord", 
CoordinatorOverlordServiceConfig.class);
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java 
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 065a6430563..e9cbab108d3 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -73,12 +73,9 @@ import org.apache.druid.indexer.report.TaskReportFileWriter;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
-import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
 import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
-import org.apache.druid.indexing.common.actions.TaskActionToolbox;
 import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
 import 
org.apache.druid.indexing.common.task.batch.parallel.DeepStorageShuffleClient;
@@ -86,11 +83,8 @@ import 
org.apache.druid.indexing.common.task.batch.parallel.HttpShuffleClient;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProviderImpl;
 import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
-import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
-import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
 import org.apache.druid.indexing.overlord.TaskRunner;
-import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.worker.executor.ExecutorLifecycle;
 import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig;
@@ -99,7 +93,6 @@ import 
org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
 import org.apache.druid.metadata.input.InputSourceModule;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.QuerySegmentWalker;
@@ -485,29 +478,9 @@ public class CliPeon extends GuiceRunnable
 
   private static void configureTaskActionClient(Binder binder)
   {
-    PolyBind.createChoice(
-        binder,
-        "druid.peon.mode",
-        Key.get(TaskActionClientFactory.class),
-        Key.get(RemoteTaskActionClientFactory.class)
-    );
-    final MapBinder<String, TaskActionClientFactory> taskActionBinder =
-        PolyBind.optionBinder(binder, Key.get(TaskActionClientFactory.class));
-    taskActionBinder
-        .addBinding("local")
-        .to(LocalTaskActionClientFactory.class)
-        .in(LazySingleton.class);
-    // all of these bindings are so that we can run the peon in local mode
-    JsonConfigProvider.bind(binder, "druid.indexer.storage", 
TaskStorageConfig.class);
-    
binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
-    binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
-    binder.bind(IndexerMetadataStorageCoordinator.class)
-          .to(IndexerSQLMetadataStorageCoordinator.class)
+    binder.bind(TaskActionClientFactory.class)
+          .to(RemoteTaskActionClientFactory.class)
           .in(LazySingleton.class);
-    taskActionBinder
-        .addBinding("remote")
-        .to(RemoteTaskActionClientFactory.class)
-        .in(LazySingleton.class);
 
     
binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.PEON);
   }
diff --git a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java 
b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
index 5faf6e134c6..3dc1711a50e 100644
--- a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
+++ b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
@@ -33,13 +33,13 @@ import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
+import org.apache.druid.guice.MetadataConfigModule;
 import org.apache.druid.guice.ServerViewModule;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.server.DruidNode;
 
 import java.lang.annotation.Annotation;
@@ -53,9 +53,6 @@ import java.util.Set;
  */
 public abstract class ServerRunnable extends GuiceRunnable
 {
-  private static final String CENTRALIZED_DATASOURCE_SCHEMA_ENABLED =
-      CentralizedDatasourceSchemaConfig.PROPERTY_PREFIX + ".enabled";
-
   private static final EmittingLogger log = new 
EmittingLogger(ServerRunnable.class);
 
   public ServerRunnable(Logger log)
@@ -207,7 +204,7 @@ public abstract class ServerRunnable extends GuiceRunnable
 
   protected static void validateCentralizedDatasourceSchemaConfig(Properties 
properties)
   {
-    if (isSegmentSchemaCacheEnabled(properties)) {
+    if (MetadataConfigModule.isSegmentSchemaCacheEnabled(properties)) {
       String serverViewType = 
properties.getProperty(ServerViewModule.SERVERVIEW_TYPE_PROPERTY);
       if (serverViewType != null && 
!serverViewType.equals(ServerViewModule.SERVERVIEW_TYPE_HTTP)) {
         throw DruidException
@@ -221,15 +218,10 @@ public abstract class ServerRunnable extends GuiceRunnable
                     ServerViewModule.SERVERVIEW_TYPE_PROPERTY,
                     serverViewType,
                     ServerViewModule.SERVERVIEW_TYPE_HTTP,
-                    CENTRALIZED_DATASOURCE_SCHEMA_ENABLED
+                    MetadataConfigModule.CENTRALIZED_DATASOURCE_SCHEMA_ENABLED
                 )
             );
       }
     }
   }
-
-  protected static boolean isSegmentSchemaCacheEnabled(Properties properties)
-  {
-    return 
Boolean.parseBoolean(properties.getProperty(CENTRALIZED_DATASOURCE_SCHEMA_ENABLED));
-  }
 }
diff --git 
a/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java 
b/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java
new file mode 100644
index 00000000000..6d501f9f768
--- /dev/null
+++ b/services/src/main/java/org/apache/druid/guice/MetadataManagerModule.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import com.google.inject.Module;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
+import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.metadata.MetadataRuleManagerConfig;
+import org.apache.druid.metadata.MetadataRuleManagerProvider;
+import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.metadata.SQLMetadataRuleManagerProvider;
+import org.apache.druid.metadata.SQLMetadataSupervisorManager;
+import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
+import org.apache.druid.metadata.SqlSegmentsMetadataManagerProvider;
+import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
+import 
org.apache.druid.metadata.segment.SqlSegmentMetadataReadOnlyTransactionFactory;
+import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
+import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
+import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
+import org.apache.druid.segment.metadata.NoopSegmentSchemaCache;
+import org.apache.druid.segment.metadata.SegmentSchemaCache;
+import org.apache.druid.server.coordinator.CoordinatorConfigManager;
+import org.apache.druid.server.coordinator.MetadataManager;
+
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Module used by Overlord and Coordinator to bind the following metadata 
managers:
+ * <ul>
+ * <li>{@link MetadataManager} - Coordinator only</li>
+ * <li>{@link MetadataRuleManager} - Coordinator only</li>
+ * <li>{@link MetadataSupervisorManager}</li>
+ * <li>{@link SegmentsMetadataManager}</li>
+ * <li>{@link IndexerMetadataStorageCoordinator}</li>
+ * <li>{@link CoordinatorConfigManager}</li>
+ * <li>{@link SegmentMetadataCache}</li>
+ * <li>{@link SegmentSchemaCache} - Coordinator only</li>
+ * </ul>
+ */
+public class MetadataManagerModule implements Module
+{
+  private Set<NodeRole> nodeRoles;
+  private boolean isSchemaCacheEnabled;
+
+  @Inject
+  public void configure(
+      Properties properties,
+      @Self Set<NodeRole> nodeRoles
+  )
+  {
+    this.nodeRoles = nodeRoles;
+    this.isSchemaCacheEnabled = 
MetadataConfigModule.isSegmentSchemaCacheEnabled(properties);
+  }
+
+  @Override
+  public void configure(Binder binder)
+  {
+    // Common dependencies
+    binder.bind(CoordinatorConfigManager.class).in(LazySingleton.class);
+
+    binder.bind(MetadataSupervisorManager.class)
+          .to(SQLMetadataSupervisorManager.class)
+          .in(LazySingleton.class);
+
+    JsonConfigProvider.bind(binder, "druid.manager.segments", 
SegmentsMetadataManagerConfig.class);
+    binder.bind(SegmentsMetadataManagerProvider.class)
+          .to(SqlSegmentsMetadataManagerProvider.class)
+          .in(LazySingleton.class);
+    binder.bind(SegmentsMetadataManager.class)
+          .toProvider(SegmentsMetadataManagerProvider.class)
+          .in(ManageLifecycle.class);
+
+    binder.bind(IndexerMetadataStorageCoordinator.class)
+          .to(IndexerSQLMetadataStorageCoordinator.class)
+          .in(ManageLifecycle.class);
+    binder.bind(SegmentMetadataCache.class)
+          .to(HeapMemorySegmentMetadataCache.class)
+          .in(LazySingleton.class);
+
+    // Coordinator-only dependencies
+    if (nodeRoles.contains(NodeRole.COORDINATOR)) {
+      JsonConfigProvider.bind(binder, "druid.manager.rules", 
MetadataRuleManagerConfig.class);
+      binder.bind(MetadataRuleManagerProvider.class)
+            .to(SQLMetadataRuleManagerProvider.class)
+            .in(LazySingleton.class);
+      binder.bind(MetadataRuleManager.class)
+            .toProvider(MetadataRuleManagerProvider.class)
+            .in(ManageLifecycle.class);
+
+      binder.bind(MetadataManager.class).in(LazySingleton.class);
+    }
+
+    if (nodeRoles.contains(NodeRole.COORDINATOR) && isSchemaCacheEnabled) {
+      binder.bind(SegmentSchemaCache.class).in(LazySingleton.class);
+    } else {
+      binder.bind(SegmentSchemaCache.class)
+            .to(NoopSegmentSchemaCache.class)
+            .in(LazySingleton.class);
+    }
+
+    // Overlord-only dependencies
+    if (nodeRoles.contains(NodeRole.OVERLORD)) {
+      binder.bind(SegmentMetadataTransactionFactory.class)
+            .to(SqlSegmentMetadataTransactionFactory.class)
+            .in(LazySingleton.class);
+    } else {
+      binder.bind(SegmentMetadataTransactionFactory.class)
+            .to(SqlSegmentMetadataReadOnlyTransactionFactory.class)
+            .in(LazySingleton.class);
+    }
+  }
+}
diff --git 
a/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java 
b/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java
index 377950b0370..744d56a89e0 100644
--- 
a/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java
+++ 
b/services/src/main/java/org/apache/druid/guice/SegmentSchemaCacheModule.java
@@ -44,7 +44,6 @@ import 
org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
 import org.apache.druid.segment.metadata.SegmentMetadataQuerySegmentWalker;
-import org.apache.druid.segment.metadata.SegmentSchemaCache;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.QuerySchedulerProvider;
 
@@ -83,7 +82,6 @@ public class SegmentSchemaCacheModule implements Module
           .toProvider(Key.get(QuerySchedulerProvider.class, Global.class))
           .in(LazySingleton.class);
     binder.bind(QuerySchedulerProvider.class).in(LazySingleton.class);
-    binder.bind(SegmentSchemaCache.class).in(LazySingleton.class);
     
binder.bind(QuerySegmentWalker.class).to(SegmentMetadataQuerySegmentWalker.class).in(LazySingleton.class);
 
     LifecycleModule.register(binder, CoordinatorSegmentMetadataCache.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to