This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 64104533acf Enable querying entirely cold datasources  (#16676)
64104533acf is described below

commit 64104533acf744bf3b596a9770beb17aedc374df
Author: Rishabh Singh <[email protected]>
AuthorDate: Mon Jul 15 15:02:59 2024 +0530

    Enable querying entirely cold datasources  (#16676)
    
    Add ability to query entirely cold datasources.
---
 .../client/coordinator/CoordinatorClient.java      |   5 +
 .../client/coordinator/CoordinatorClientImpl.java  |  13 +
 .../metadata/AbstractSegmentMetadataCache.java     |  16 +-
 .../metadata/CoordinatorSegmentMetadataCache.java  | 220 +++++++++-
 .../druid/server/coordinator/DruidCoordinator.java |   3 +
 .../client/coordinator/NoopCoordinatorClient.java  |   6 +
 ...CoordinatorSegmentDataCacheConcurrencyTest.java |  23 +-
 .../CoordinatorSegmentMetadataCacheTest.java       | 479 ++++++++++++++++++++-
 .../calcite/schema/BrokerSegmentMetadataCache.java |  41 +-
 .../schema/BrokerSegmentMetadataCacheTest.java     |  89 +++-
 10 files changed, 853 insertions(+), 42 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
 
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index fdf16b2ac50..edeb16665ba 100644
--- 
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -69,4 +69,9 @@ public interface CoordinatorClient
    * Returns a new instance backed by a ServiceClient which follows the 
provided retryPolicy
    */
   CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy);
+
+  /**
+   * Retrieves list of datasources with used segments.
+   */
+  ListenableFuture<Set<String>> fetchDataSourcesWithUsedSegments();
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
 
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
index 4c795c9dbd4..fc3deee12ed 100644
--- 
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
+++ 
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
@@ -188,4 +188,17 @@ public class CoordinatorClientImpl implements 
CoordinatorClient
   {
     return new CoordinatorClientImpl(client.withRetryPolicy(retryPolicy), 
jsonMapper);
   }
+
+  @Override
+  public ListenableFuture<Set<String>> fetchDataSourcesWithUsedSegments()
+  {
+    final String path = "/druid/coordinator/v1/metadata/datasources";
+    return FutureUtils.transform(
+        client.asyncRequest(
+            new RequestBuilder(HttpMethod.GET, path),
+            new BytesFullResponseHandler()
+        ),
+        holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new 
TypeReference<Set<String>>() {})
+    );
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
index 9cb2297db82..88e6ee97b98 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java
@@ -200,7 +200,7 @@ public abstract class AbstractSegmentMetadataCache<T 
extends DataSourceInformati
    * Map of datasource and generic object extending DataSourceInformation.
    * This structure can be accessed by {@link #cacheExec} and {@link 
#callbackExec} threads.
    */
-  protected final ConcurrentMap<String, T> tables = new ConcurrentHashMap<>();
+  protected final ConcurrentHashMap<String, T> tables = new 
ConcurrentHashMap<>();
 
   /**
    * This lock coordinates the access from multiple threads to those variables 
guarded by this lock.
@@ -269,9 +269,10 @@ public abstract class AbstractSegmentMetadataCache<T 
extends DataSourceInformati
               final boolean wasRecentFailure = DateTimes.utc(lastFailure)
                                                         
.plus(config.getMetadataRefreshPeriod())
                                                         .isAfterNow();
+
               if (isServerViewInitialized &&
                   !wasRecentFailure &&
-                  (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty()) &&
+                  shouldRefresh() &&
                   (refreshImmediately || nextRefresh < 
System.currentTimeMillis())) {
                 // We need to do a refresh. Break out of the waiting loop.
                 break;
@@ -334,6 +335,7 @@ public abstract class AbstractSegmentMetadataCache<T 
extends DataSourceInformati
     }
   }
 
+
   /**
    * Lifecycle start method.
    */
@@ -361,6 +363,15 @@ public abstract class AbstractSegmentMetadataCache<T 
extends DataSourceInformati
     // noop
   }
 
+  /**
+   * Refresh is executed only when there are segments or datasources needing 
refresh.
+   */
+  @SuppressWarnings("GuardedBy")
+  protected boolean shouldRefresh()
+  {
+    return (!segmentsNeedingRefresh.isEmpty() || 
!dataSourcesNeedingRebuild.isEmpty());
+  }
+
   public void awaitInitialization() throws InterruptedException
   {
     initialized.await();
@@ -373,6 +384,7 @@ public abstract class AbstractSegmentMetadataCache<T 
extends DataSourceInformati
    *
    * @return schema information for the given datasource
    */
+  @Nullable
   public T getDatasource(String name)
   {
     return tables.get(name);
diff --git 
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
 
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
index dad0b78ea77..3a4f548b8ba 100644
--- 
a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
+++ 
b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java
@@ -20,19 +20,27 @@
 package org.apache.druid.segment.metadata;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import org.apache.druid.client.CoordinatorServerView;
+import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.client.InternalQueryConfig;
 import org.apache.druid.client.ServerView;
 import org.apache.druid.client.TimelineServerView;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.SqlSegmentsMetadataManager;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
 import org.apache.druid.segment.SchemaPayloadPlus;
@@ -41,21 +49,30 @@ import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
+import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
 import org.apache.druid.server.security.Escalator;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -71,17 +88,36 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * <ul><li>Metadata query is executed only for those non-realtime segments for 
which the schema is not cached.</li>
  * <li>Datasources marked for refresh are then rebuilt.</li></ul>
  * </li>
+ * <p>
+ * It is important to note that the datasource schema returned in {@link 
#getDatasource} & {@link #getDataSourceInformationMap()}
+ * also includes columns from cold segments.
+ * Cold segments are processed in a separate thread and datasource schema from 
cold segments is separately stored.
+ * </p>
  */
 @ManageLifecycle
 public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCache<DataSourceInformation>
 {
   private static final EmittingLogger log = new 
EmittingLogger(CoordinatorSegmentMetadataCache.class);
+  private static final Long COLD_SCHEMA_PERIOD_MULTIPLIER = 3L;
+  private static final Long COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS = 
TimeUnit.SECONDS.toMillis(50);
 
   private final SegmentMetadataCacheConfig config;
   private final ColumnTypeMergePolicy columnTypeMergePolicy;
   private final SegmentSchemaCache segmentSchemaCache;
   private final SegmentSchemaBackFillQueue segmentSchemaBackfillQueue;
+  private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
+  private volatile SegmentReplicationStatus segmentReplicationStatus = null;
+
+  // Datasource schema built from only cold segments.
+  private final ConcurrentHashMap<String, DataSourceInformation> 
coldSchemaTable = new ConcurrentHashMap<>();
+
+  // Period for cold schema processing thread. This is a multiple of segment 
polling period.
+  // Cold schema processing runs slower than the segment poll to save 
processing cost of all segments.
+  // The downside is a delay in columns from cold segment reflecting in the 
datasource schema.
+  private final long coldSchemaExecPeriodMillis;
+  private final ScheduledExecutorService coldSchemaExec;
   private @Nullable Future<?> cacheExecFuture = null;
+  private @Nullable Future<?> coldSchemaExecFuture = null;
 
   @Inject
   public CoordinatorSegmentMetadataCache(
@@ -92,7 +128,9 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
       InternalQueryConfig internalQueryConfig,
       ServiceEmitter emitter,
       SegmentSchemaCache segmentSchemaCache,
-      SegmentSchemaBackFillQueue segmentSchemaBackfillQueue
+      SegmentSchemaBackFillQueue segmentSchemaBackfillQueue,
+      SqlSegmentsMetadataManager sqlSegmentsMetadataManager,
+      Supplier<SegmentsMetadataManagerConfig> 
segmentsMetadataManagerConfigSupplier
   )
   {
     super(queryLifecycleFactory, config, escalator, internalQueryConfig, 
emitter);
@@ -100,6 +138,15 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
     this.columnTypeMergePolicy = config.getMetadataColumnTypeMergePolicy();
     this.segmentSchemaCache = segmentSchemaCache;
     this.segmentSchemaBackfillQueue = segmentSchemaBackfillQueue;
+    this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
+    this.coldSchemaExecPeriodMillis =
+        
segmentsMetadataManagerConfigSupplier.get().getPollDuration().getMillis() * 
COLD_SCHEMA_PERIOD_MULTIPLIER;
+    coldSchemaExec = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder()
+            .setNameFormat("DruidColdSchema-ScheduledExecutor-%d")
+            .setDaemon(false)
+            .build()
+    );
 
     initServerViewTimelineCallback(serverView);
   }
@@ -168,11 +215,15 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
   {
     callbackExec.shutdownNow();
     cacheExec.shutdownNow();
+    coldSchemaExec.shutdownNow();
     segmentSchemaCache.onLeaderStop();
     segmentSchemaBackfillQueue.onLeaderStop();
     if (cacheExecFuture != null) {
       cacheExecFuture.cancel(true);
     }
+    if (coldSchemaExecFuture != null) {
+      coldSchemaExecFuture.cancel(true);
+    }
   }
 
   public void onLeaderStart()
@@ -181,6 +232,12 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
     try {
       segmentSchemaBackfillQueue.onLeaderStart();
       cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
+      coldSchemaExecFuture = coldSchemaExec.schedule(
+          this::coldDatasourceSchemaExec,
+          coldSchemaExecPeriodMillis,
+          TimeUnit.MILLISECONDS
+      );
+
       if (config.isAwaitInitializationOnStart()) {
         awaitInitialization();
       }
@@ -196,6 +253,9 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
     if (cacheExecFuture != null) {
       cacheExecFuture.cancel(true);
     }
+    if (coldSchemaExecFuture != null) {
+      coldSchemaExecFuture.cancel(true);
+    }
     segmentSchemaCache.onLeaderStop();
     segmentSchemaBackfillQueue.onLeaderStop();
   }
@@ -209,6 +269,11 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
     segmentSchemaCache.awaitInitialization();
   }
 
+  public void updateSegmentReplicationStatus(SegmentReplicationStatus 
segmentReplicationStatus)
+  {
+    this.segmentReplicationStatus = segmentReplicationStatus;
+  }
+
   @Override
   protected void unmarkSegmentAsMutable(SegmentId segmentId)
   {
@@ -336,6 +401,62 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
     return availableSegmentMetadata;
   }
 
+  @Override
+  public DataSourceInformation getDatasource(String name)
+  {
+    return getMergedDatasourceInformation(tables.get(name), 
coldSchemaTable.get(name)).orElse(null);
+  }
+
+  @Override
+  public Map<String, DataSourceInformation> getDataSourceInformationMap()
+  {
+    Map<String, DataSourceInformation> hot = new HashMap<>(tables);
+    Map<String, DataSourceInformation> cold = new HashMap<>(coldSchemaTable);
+    Set<String> combinedDatasources = new HashSet<>(hot.keySet());
+    combinedDatasources.addAll(cold.keySet());
+    ImmutableMap.Builder<String, DataSourceInformation> combined = 
ImmutableMap.builder();
+
+    for (String dataSource : combinedDatasources) {
+      getMergedDatasourceInformation(hot.get(dataSource), cold.get(dataSource))
+          .ifPresent(merged -> combined.put(
+              dataSource,
+              merged
+          ));
+    }
+
+    return combined.build();
+  }
+
+  private Optional<DataSourceInformation> getMergedDatasourceInformation(
+      final DataSourceInformation hot,
+      final DataSourceInformation cold
+  )
+  {
+    if (hot == null && cold == null) {
+      return Optional.empty();
+    } else if (hot != null && cold == null) {
+      return Optional.of(hot);
+    } else if (hot == null && cold != null) {
+      return Optional.of(cold);
+    } else {
+      final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
+
+      List<RowSignature> signatures = new ArrayList<>();
+      // hot datasource schema takes precedence
+      signatures.add(hot.getRowSignature());
+      signatures.add(cold.getRowSignature());
+
+      for (RowSignature signature : signatures) {
+        mergeRowSignature(columnTypes, signature);
+      }
+
+      final RowSignature.Builder builder = RowSignature.builder();
+      columnTypes.forEach(builder::add);
+
+      return Optional.of(new DataSourceInformation(hot.getDataSource(), 
builder.build()));
+    }
+  }
+
   /**
    * Executes SegmentMetadataQuery to fetch schema information for each 
segment in the refresh list.
    * The schema information for individual segments is combined to construct a 
table schema, which is then cached.
@@ -382,6 +503,7 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
     // Rebuild the datasources.
     for (String dataSource : dataSourcesToRebuild) {
       final RowSignature rowSignature = 
buildDataSourceRowSignature(dataSource);
+
       if (rowSignature == null) {
         log.info("RowSignature null for dataSource [%s], implying that it no 
longer exists. All metadata removed.", dataSource);
         tables.remove(dataSource);
@@ -419,6 +541,94 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
     return cachedSegments;
   }
 
+  @Nullable
+  private Integer getReplicationFactor(SegmentId segmentId)
+  {
+    if (segmentReplicationStatus == null) {
+      return null;
+    }
+    SegmentReplicaCount replicaCountsInCluster = 
segmentReplicationStatus.getReplicaCountsInCluster(segmentId);
+    return replicaCountsInCluster == null ? null : 
replicaCountsInCluster.required();
+  }
+
+  @VisibleForTesting
+  protected void coldDatasourceSchemaExec()
+  {
+    Stopwatch stopwatch = Stopwatch.createStarted();
+
+    Set<String> dataSourceWithColdSegmentSet = new HashSet<>();
+
+    int datasources = 0;
+    int segments = 0;
+    int dataSourceWithColdSegments = 0;
+
+    Collection<ImmutableDruidDataSource> immutableDataSources =
+        
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments();
+
+    for (ImmutableDruidDataSource dataSource : immutableDataSources) {
+      datasources++;
+      Collection<DataSegment> dataSegments = dataSource.getSegments();
+
+      final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();
+
+      for (DataSegment segment : dataSegments) {
+        Integer replicationFactor = getReplicationFactor(segment.getId());
+        if (replicationFactor != null && replicationFactor != 0) {
+          continue;
+        }
+        Optional<SchemaPayloadPlus> optionalSchema = 
segmentSchemaCache.getSchemaForSegment(segment.getId());
+        if (optionalSchema.isPresent()) {
+          RowSignature rowSignature = 
optionalSchema.get().getSchemaPayload().getRowSignature();
+          mergeRowSignature(columnTypes, rowSignature);
+        }
+        segments++;
+      }
+
+      if (columnTypes.isEmpty()) {
+        // this datasource doesn't have any cold segment
+        continue;
+      }
+
+      final RowSignature.Builder builder = RowSignature.builder();
+      columnTypes.forEach(builder::add);
+
+      RowSignature coldSignature = builder.build();
+
+      String dataSourceName = dataSource.getName();
+      dataSourceWithColdSegmentSet.add(dataSourceName);
+      dataSourceWithColdSegments++;
+
+      log.debug("[%s] signature from cold segments is [%s]", dataSourceName, 
coldSignature);
+
+      coldSchemaTable.put(dataSourceName, new 
DataSourceInformation(dataSourceName, coldSignature));
+    }
+
+    // remove any stale datasource from the map
+    coldSchemaTable.keySet().retainAll(dataSourceWithColdSegmentSet);
+
+    String executionStatsLog = StringUtils.format(
+        "Cold schema processing took [%d] millis. "
+        + "Processed total [%d] datasources, [%d] segments. Found [%d] 
datasources with cold segments.",
+        stopwatch.millisElapsed(), datasources, segments, 
dataSourceWithColdSegments
+    );
+    if (stopwatch.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS) {
+      log.info(executionStatsLog);
+    } else {
+      log.debug(executionStatsLog);
+    }
+  }
+
+  private void mergeRowSignature(final Map<String, ColumnType> columnTypes, 
final RowSignature signature)
+  {
+    for (String column : signature.getColumnNames()) {
+      final ColumnType columnType =
+          signature.getColumnType(column)
+                   .orElseThrow(() -> new ISE("Encountered null type for 
column [%s]", column));
+
+      columnTypes.compute(column, (c, existingType) -> 
columnTypeMergePolicy.merge(existingType, columnType));
+    }
+  }
+
   @VisibleForTesting
   @Nullable
   @Override
@@ -434,13 +644,7 @@ public class CoordinatorSegmentMetadataCache extends 
AbstractSegmentMetadataCach
         Optional<SchemaPayloadPlus> optionalSchema = 
segmentSchemaCache.getSchemaForSegment(segmentId);
         if (optionalSchema.isPresent()) {
           RowSignature rowSignature = 
optionalSchema.get().getSchemaPayload().getRowSignature();
-          for (String column : rowSignature.getColumnNames()) {
-            final ColumnType columnType =
-                rowSignature.getColumnType(column)
-                            .orElseThrow(() -> new ISE("Encountered null type 
for column [%s]", column));
-
-            columnTypes.compute(column, (c, existingType) -> 
columnTypeMergePolicy.merge(existingType, columnType));
-          }
+          mergeRowSignature(columnTypes, rowSignature);
         } else {
           // mark it for refresh, however, this case shouldn't arise by design
           markSegmentAsNeedRefresh(segmentId);
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 36cfac8089c..9710bda79b4 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -816,6 +816,9 @@ public class DruidCoordinator
     {
       broadcastSegments = params.getBroadcastSegments();
       segmentReplicationStatus = params.getSegmentReplicationStatus();
+      if (coordinatorSegmentMetadataCache != null) {
+        
coordinatorSegmentMetadataCache.updateSegmentReplicationStatus(segmentReplicationStatus);
+      }
 
       // Collect stats for unavailable and under-replicated segments
       final CoordinatorRunStats stats = params.getCoordinatorStats();
diff --git 
a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
 
b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
index 5aee343a851..58f5af58a3e 100644
--- 
a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
+++ 
b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
@@ -75,4 +75,10 @@ public class NoopCoordinatorClient implements 
CoordinatorClient
     // Ignore retryPolicy for the test client.
     return this;
   }
+
+  @Override
+  public ListenableFuture<Set<String>> fetchDataSourcesWithUsedSegments()
+  {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
index 81f65acf84a..4cc4ac38184 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
@@ -20,6 +20,8 @@
 package org.apache.druid.segment.metadata;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import org.apache.druid.client.BrokerServerView;
@@ -39,6 +41,8 @@ import org.apache.druid.java.util.common.NonnullPair;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.SqlSegmentsMetadataManager;
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.TableDataSource;
@@ -61,16 +65,19 @@ import 
org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
+import org.joda.time.Period;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -101,6 +108,8 @@ public class CoordinatorSegmentDataCacheConcurrencyTest 
extends SegmentMetadataC
   private TestSegmentMetadataQueryWalker walker;
   private SegmentSchemaCache segmentSchemaCache;
   private SegmentSchemaBackFillQueue backFillQueue;
+  private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
+  private Supplier<SegmentsMetadataManagerConfig> 
segmentsMetadataManagerConfigSupplier;
   private final ObjectMapper mapper = TestHelper.makeJsonMapper();
 
   @Before
@@ -190,6 +199,12 @@ public class CoordinatorSegmentDataCacheConcurrencyTest 
extends SegmentMetadataC
         }
     );
 
+    sqlSegmentsMetadataManager = 
Mockito.mock(SqlSegmentsMetadataManager.class);
+    
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
+    SegmentsMetadataManagerConfig metadataManagerConfig = 
Mockito.mock(SegmentsMetadataManagerConfig.class);
+    
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
+    segmentsMetadataManagerConfigSupplier = 
Suppliers.ofInstance(metadataManagerConfig);
+
     inventoryView.init();
     initLatch.await();
     exec = Execs.multiThreaded(4, "DruidSchemaConcurrencyTest-%d");
@@ -227,7 +242,9 @@ public class CoordinatorSegmentDataCacheConcurrencyTest 
extends SegmentMetadataC
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -341,7 +358,9 @@ public class CoordinatorSegmentDataCacheConcurrencyTest 
extends SegmentMetadataC
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index e5b6db1d42d..ef1fb1e8edd 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -22,11 +22,14 @@ package org.apache.druid.segment.metadata;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.client.InternalQueryConfig;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.Intervals;
@@ -37,6 +40,8 @@ import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.MetadataStorageTablesConfig;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.SqlSegmentsMetadataManager;
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.QueryContexts;
@@ -66,6 +71,8 @@ import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.QueryResponse;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
+import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.AllowAllAuthenticator;
@@ -74,18 +81,23 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.easymock.EasyMock;
+import org.joda.time.Period;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
 import org.skife.jdbi.v2.StatementContext;
 
 import java.io.File;
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -106,12 +118,19 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
   private CoordinatorSegmentMetadataCache runningSchema;
   private CountDownLatch buildTableLatch = new CountDownLatch(1);
   private CountDownLatch markDataSourceLatch = new CountDownLatch(1);
+  private SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
+  private Supplier<SegmentsMetadataManagerConfig> 
segmentsMetadataManagerConfigSupplier;
 
   @Before
   @Override
   public void setUp() throws Exception
   {
     super.setUp();
+    sqlSegmentsMetadataManager = 
Mockito.mock(SqlSegmentsMetadataManager.class);
+    
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
+    SegmentsMetadataManagerConfig metadataManagerConfig = 
Mockito.mock(SegmentsMetadataManagerConfig.class);
+    
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
+    segmentsMetadataManagerConfigSupplier = 
Suppliers.ofInstance(metadataManagerConfig);
   }
 
   @After
@@ -132,6 +151,7 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
   public CoordinatorSegmentMetadataCache 
buildSchemaMarkAndTableLatch(SegmentMetadataCacheConfig config) throws 
InterruptedException
   {
     Preconditions.checkState(runningSchema == null);
+
     runningSchema = new CoordinatorSegmentMetadataCache(
         getQueryLifecycleFactory(walker),
         serverView,
@@ -140,7 +160,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -178,7 +200,7 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
   public void testGetTableMapFoo() throws InterruptedException
   {
     CoordinatorSegmentMetadataCache schema = buildSchemaMarkAndTableLatch();
-    verifyFooDSSchema(schema);
+    verifyFooDSSchema(schema, 6);
   }
 
   @Test
@@ -312,7 +334,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -523,7 +547,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -558,6 +584,11 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
   {
     String datasource = "newSegmentAddTest";
     CountDownLatch addSegmentLatch = new CountDownLatch(2);
+    SqlSegmentsMetadataManager sqlSegmentsMetadataManager = 
Mockito.mock(SqlSegmentsMetadataManager.class);
+    
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
+    SegmentsMetadataManagerConfig metadataManagerConfig = 
Mockito.mock(SegmentsMetadataManagerConfig.class);
+    
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
+    Supplier<SegmentsMetadataManagerConfig> 
segmentsMetadataManagerConfigSupplier = 
Suppliers.ofInstance(metadataManagerConfig);
     CoordinatorSegmentMetadataCache schema = new 
CoordinatorSegmentMetadataCache(
         getQueryLifecycleFactory(walker),
         serverView,
@@ -566,7 +597,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -605,6 +638,11 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
   {
     String datasource = "newSegmentAddTest";
     CountDownLatch addSegmentLatch = new CountDownLatch(1);
+    SqlSegmentsMetadataManager sqlSegmentsMetadataManager = 
Mockito.mock(SqlSegmentsMetadataManager.class);
+    
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
+    SegmentsMetadataManagerConfig metadataManagerConfig = 
Mockito.mock(SegmentsMetadataManagerConfig.class);
+    
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
+    Supplier<SegmentsMetadataManagerConfig> 
segmentsMetadataManagerConfigSupplier = 
Suppliers.ofInstance(metadataManagerConfig);
     CoordinatorSegmentMetadataCache schema = new 
CoordinatorSegmentMetadataCache(
         getQueryLifecycleFactory(walker),
         serverView,
@@ -613,7 +651,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -649,6 +689,11 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
   {
     String datasource = "newSegmentAddTest";
     CountDownLatch addSegmentLatch = new CountDownLatch(1);
+    SqlSegmentsMetadataManager sqlSegmentsMetadataManager = 
Mockito.mock(SqlSegmentsMetadataManager.class);
+    
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).thenReturn(Collections.emptyList());
+    SegmentsMetadataManagerConfig metadataManagerConfig = 
Mockito.mock(SegmentsMetadataManagerConfig.class);
+    
Mockito.when(metadataManagerConfig.getPollDuration()).thenReturn(Period.millis(1000));
+    Supplier<SegmentsMetadataManagerConfig> 
segmentsMetadataManagerConfigSupplier = 
Suppliers.ofInstance(metadataManagerConfig);
     CoordinatorSegmentMetadataCache schema = new 
CoordinatorSegmentMetadataCache(
         getQueryLifecycleFactory(walker),
         serverView,
@@ -657,7 +702,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -698,7 +745,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -756,7 +805,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -817,7 +868,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -852,7 +905,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -900,7 +955,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -972,7 +1029,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         internalQueryConfig,
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     );
 
     Map<String, Object> queryContext = ImmutableMap.of(
@@ -1141,7 +1200,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         emitter,
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     )
     {
       @Override
@@ -1306,7 +1367,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     ) {
       @Override
       void updateSchemaForRealtimeSegments(SegmentSchemas segmentSchemas)
@@ -1385,7 +1448,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     ) {
       @Override
       public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> 
dataSourcesToRebuild)
@@ -1565,7 +1630,9 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
         segmentSchemaCache,
-        backFillQueue
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
     ) {
       @Override
       public Set<SegmentId> refreshSegmentsForDataSource(String dataSource, 
Set<SegmentId> segments)
@@ -1594,7 +1661,7 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
     Assert.assertEquals(0, refreshCount.get());
 
     // verify that datasource schema is built
-    verifyFooDSSchema(schema);
+    verifyFooDSSchema(schema, 6);
 
     serverView.addSegment(segment3, ServerType.HISTORICAL);
 
@@ -1721,12 +1788,384 @@ public class CoordinatorSegmentMetadataCacheTest 
extends CoordinatorSegmentMetad
     Assert.assertEquals(existingMetadata.getNumReplicas(), 
currentMetadata.getNumReplicas());
   }
 
-  private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema)
+  private CoordinatorSegmentMetadataCache setupForColdDatasourceSchemaTest()
+  {
+    // foo has both hot and cold segments
+    DataSegment coldSegment =
+        DataSegment.builder()
+                   .dataSource(DATASOURCE1)
+                   .interval(Intervals.of("1998/P2Y"))
+                   .version("1")
+                   .shardSpec(new LinearShardSpec(0))
+                   .size(0)
+                   .build();
+
+    // cold has only cold segments
+    DataSegment singleColdSegment =
+        DataSegment.builder()
+                   .dataSource("cold")
+                   .interval(Intervals.of("2000/P2Y"))
+                   .version("1")
+                   .shardSpec(new LinearShardSpec(0))
+                   .size(0)
+                   .build();
+
+    ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new 
ImmutableMap.Builder<>();
+    segmentStatsMap.put(coldSegment.getId(), new SegmentMetadata(20L, 
"foo-fingerprint"));
+    segmentStatsMap.put(singleColdSegment.getId(), new SegmentMetadata(20L, 
"cold-fingerprint"));
+    ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new 
ImmutableMap.Builder<>();
+    schemaPayloadMap.put(
+        "foo-fingerprint",
+        new SchemaPayload(RowSignature.builder()
+                                      .add("dim1", ColumnType.STRING)
+                                      .add("c1", ColumnType.STRING)
+                                      .add("c2", ColumnType.LONG)
+                                      .build())
+    );
+    schemaPayloadMap.put(
+        "cold-fingerprint",
+        new SchemaPayload(
+            RowSignature.builder()
+                              .add("f1", ColumnType.STRING)
+                              .add("f2", ColumnType.DOUBLE)
+                              .build()
+        )
+    );
+
+    segmentSchemaCache.updateFinalizedSegmentSchema(
+        new 
SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), 
schemaPayloadMap.build())
+    );
+
+    List<ImmutableDruidDataSource> druidDataSources = new ArrayList<>();
+    Map<SegmentId, DataSegment> segmentMap = new HashMap<>();
+    segmentMap.put(coldSegment.getId(), coldSegment);
+    segmentMap.put(segment1.getId(), segment1);
+    segmentMap.put(segment2.getId(), segment2);
+    druidDataSources.add(new ImmutableDruidDataSource(
+        coldSegment.getDataSource(),
+        Collections.emptyMap(),
+        segmentMap
+    ));
+    druidDataSources.add(new ImmutableDruidDataSource(
+        singleColdSegment.getDataSource(),
+        Collections.emptyMap(),
+        Collections.singletonMap(singleColdSegment.getId(), singleColdSegment)
+    ));
+
+    Mockito.when(
+               
sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
+           .thenReturn(druidDataSources);
+
+    CoordinatorSegmentMetadataCache schema = new 
CoordinatorSegmentMetadataCache(
+        getQueryLifecycleFactory(walker),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        new NoopServiceEmitter(),
+        segmentSchemaCache,
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
+    );
+
+    SegmentReplicaCount zeroSegmentReplicaCount = 
Mockito.mock(SegmentReplicaCount.class);
+    SegmentReplicaCount nonZeroSegmentReplicaCount = 
Mockito.mock(SegmentReplicaCount.class);
+    Mockito.when(zeroSegmentReplicaCount.required()).thenReturn(0);
+    Mockito.when(nonZeroSegmentReplicaCount.required()).thenReturn(1);
+    SegmentReplicationStatus segmentReplicationStatus = 
Mockito.mock(SegmentReplicationStatus.class);
+    
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegment.getId())))
+           .thenReturn(zeroSegmentReplicaCount);
+    
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(singleColdSegment.getId())))
+           .thenReturn(zeroSegmentReplicaCount);
+    
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(segment1.getId())))
+           .thenReturn(nonZeroSegmentReplicaCount);
+
+    
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(segment2.getId())))
+           .thenReturn(nonZeroSegmentReplicaCount);
+
+    schema.updateSegmentReplicationStatus(segmentReplicationStatus);
+    schema.updateSegmentReplicationStatus(segmentReplicationStatus);
+
+    return schema;
+  }
+
+  @Test
+  public void testColdDatasourceSchema_refreshAfterColdSchemaExec() throws 
IOException
+  {
+    CoordinatorSegmentMetadataCache schema = 
setupForColdDatasourceSchemaTest();
+
+    schema.coldDatasourceSchemaExec();
+
+    Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), 
schema.getDataSourceInformationMap().keySet());
+
+    // verify that cold schema for both foo and cold is present
+    RowSignature fooSignature = schema.getDatasource("foo").getRowSignature();
+    List<String> columnNames = fooSignature.getColumnNames();
+
+    // verify that foo schema doesn't contain columns from hot segments
+    Assert.assertEquals(3, columnNames.size());
+
+    Assert.assertEquals("dim1", columnNames.get(0));
+    Assert.assertEquals(ColumnType.STRING, 
fooSignature.getColumnType(columnNames.get(0)).get());
+
+    Assert.assertEquals("c1", columnNames.get(1));
+    Assert.assertEquals(ColumnType.STRING, 
fooSignature.getColumnType(columnNames.get(1)).get());
+
+    Assert.assertEquals("c2", columnNames.get(2));
+    Assert.assertEquals(ColumnType.LONG, 
fooSignature.getColumnType(columnNames.get(2)).get());
+
+    RowSignature coldSignature = 
schema.getDatasource("cold").getRowSignature();
+    columnNames = coldSignature.getColumnNames();
+    Assert.assertEquals("f1", columnNames.get(0));
+    Assert.assertEquals(ColumnType.STRING, 
coldSignature.getColumnType(columnNames.get(0)).get());
+
+    Assert.assertEquals("f2", columnNames.get(1));
+    Assert.assertEquals(ColumnType.DOUBLE, 
coldSignature.getColumnType(columnNames.get(1)).get());
+
+    Set<SegmentId> segmentIds = new HashSet<>();
+    segmentIds.add(segment1.getId());
+    segmentIds.add(segment2.getId());
+
+    schema.refresh(segmentIds, new HashSet<>());
+
+    Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), 
schema.getDataSourceInformationMap().keySet());
+
+    coldSignature = schema.getDatasource("cold").getRowSignature();
+    columnNames = coldSignature.getColumnNames();
+    Assert.assertEquals("f1", columnNames.get(0));
+    Assert.assertEquals(ColumnType.STRING, 
coldSignature.getColumnType(columnNames.get(0)).get());
+
+    Assert.assertEquals("f2", columnNames.get(1));
+    Assert.assertEquals(ColumnType.DOUBLE, 
coldSignature.getColumnType(columnNames.get(1)).get());
+
+    // foo now contains schema from both hot and cold segments
+    verifyFooDSSchema(schema, 8);
+    RowSignature rowSignature = schema.getDatasource("foo").getRowSignature();
+
+    // cold columns should be present at the end
+    columnNames = rowSignature.getColumnNames();
+    Assert.assertEquals("c1", columnNames.get(6));
+    Assert.assertEquals(ColumnType.STRING, 
rowSignature.getColumnType(columnNames.get(6)).get());
+
+    Assert.assertEquals("c2", columnNames.get(7));
+    Assert.assertEquals(ColumnType.LONG, 
rowSignature.getColumnType(columnNames.get(7)).get());
+  }
+
+  @Test
+  public void testColdDatasourceSchema_coldSchemaExecAfterRefresh() throws 
IOException
+  {
+    CoordinatorSegmentMetadataCache schema = 
setupForColdDatasourceSchemaTest();
+
+    Set<SegmentId> segmentIds = new HashSet<>();
+    segmentIds.add(segment1.getId());
+    segmentIds.add(segment2.getId());
+
+    schema.refresh(segmentIds, new HashSet<>());
+    // cold datasource shouldn't be present
+    Assert.assertEquals(Collections.singleton("foo"), 
schema.getDataSourceInformationMap().keySet());
+
+    // cold columns shouldn't be present
+    verifyFooDSSchema(schema, 6);
+    Assert.assertNull(schema.getDatasource("cold"));
+
+    schema.coldDatasourceSchemaExec();
+
+    // could datasource should be present now
+    Assert.assertEquals(new HashSet<>(Arrays.asList("foo", "cold")), 
schema.getDataSourceInformationMap().keySet());
+
+    RowSignature coldSignature = 
schema.getDatasource("cold").getRowSignature();
+    List<String> columnNames = coldSignature.getColumnNames();
+    Assert.assertEquals("f1", columnNames.get(0));
+    Assert.assertEquals(ColumnType.STRING, 
coldSignature.getColumnType(columnNames.get(0)).get());
+
+    Assert.assertEquals("f2", columnNames.get(1));
+    Assert.assertEquals(ColumnType.DOUBLE, 
coldSignature.getColumnType(columnNames.get(1)).get());
+
+    // columns from cold datasource should be present
+    verifyFooDSSchema(schema, 8);
+    RowSignature rowSignature = schema.getDatasource("foo").getRowSignature();
+
+    columnNames = rowSignature.getColumnNames();
+    Assert.assertEquals("c1", columnNames.get(6));
+    Assert.assertEquals(ColumnType.STRING, 
rowSignature.getColumnType(columnNames.get(6)).get());
+
+    Assert.assertEquals("c2", columnNames.get(7));
+    Assert.assertEquals(ColumnType.LONG, 
rowSignature.getColumnType(columnNames.get(7)).get());
+  }
+
+  @Test
+  public void testColdDatasourceSchema_verifyStaleDatasourceRemoved()
+  {
+    DataSegment coldSegmentAlpha =
+        DataSegment.builder()
+                   .dataSource("alpha")
+                   .interval(Intervals.of("2000/P2Y"))
+                   .version("1")
+                   .shardSpec(new LinearShardSpec(0))
+                   .size(0)
+                   .build();
+
+    DataSegment coldSegmentBeta =
+        DataSegment.builder()
+                   .dataSource("beta")
+                   .interval(Intervals.of("2000/P2Y"))
+                   .version("1")
+                   .shardSpec(new LinearShardSpec(0))
+                   .size(0)
+                   .build();
+
+    DataSegment coldSegmentGamma =
+        DataSegment.builder()
+            .dataSource("gamma")
+            .interval(Intervals.of("2000/P2Y"))
+            .version("1")
+            .shardSpec(new LinearShardSpec(0))
+            .size(0)
+            .build();
+
+    DataSegment hotSegmentGamma =
+        DataSegment.builder()
+                   .dataSource("gamma")
+                   .interval(Intervals.of("2001/P2Y"))
+                   .version("1")
+                   .shardSpec(new LinearShardSpec(0))
+                   .size(0)
+                   .build();
+
+    ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentStatsMap = new 
ImmutableMap.Builder<>();
+    segmentStatsMap.put(coldSegmentAlpha.getId(), new SegmentMetadata(20L, 
"cold"));
+    segmentStatsMap.put(coldSegmentBeta.getId(), new SegmentMetadata(20L, 
"cold"));
+    segmentStatsMap.put(hotSegmentGamma.getId(), new SegmentMetadata(20L, 
"hot"));
+    segmentStatsMap.put(coldSegmentGamma.getId(), new SegmentMetadata(20L, 
"cold"));
+
+    ImmutableMap.Builder<String, SchemaPayload> schemaPayloadMap = new 
ImmutableMap.Builder<>();
+    schemaPayloadMap.put(
+        "cold",
+        new SchemaPayload(RowSignature.builder()
+                                      .add("dim1", ColumnType.STRING)
+                                      .add("c1", ColumnType.STRING)
+                                      .add("c2", ColumnType.LONG)
+                                      .build())
+    );
+    schemaPayloadMap.put(
+        "hot",
+        new SchemaPayload(RowSignature.builder()
+                                      .add("c3", ColumnType.STRING)
+                                      .add("c4", ColumnType.STRING)
+                                      .build())
+    );
+    segmentSchemaCache.updateFinalizedSegmentSchema(
+        new 
SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), 
schemaPayloadMap.build())
+    );
+
+    List<ImmutableDruidDataSource> druidDataSources = new ArrayList<>();
+    druidDataSources.add(
+        new ImmutableDruidDataSource(
+            "alpha",
+            Collections.emptyMap(),
+            Collections.singletonMap(coldSegmentAlpha.getId(), 
coldSegmentAlpha)
+        )
+    );
+
+    Map<SegmentId, DataSegment> gammaSegments = new HashMap<>();
+    gammaSegments.put(hotSegmentGamma.getId(), hotSegmentGamma);
+    gammaSegments.put(coldSegmentGamma.getId(), coldSegmentGamma);
+
+    druidDataSources.add(
+        new ImmutableDruidDataSource(
+            "gamma",
+            Collections.emptyMap(),
+            gammaSegments
+        )
+    );
+
+    
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
+           .thenReturn(druidDataSources);
+
+    CoordinatorSegmentMetadataCache schema = new 
CoordinatorSegmentMetadataCache(
+        getQueryLifecycleFactory(walker),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        new NoopServiceEmitter(),
+        segmentSchemaCache,
+        backFillQueue,
+        sqlSegmentsMetadataManager,
+        segmentsMetadataManagerConfigSupplier
+    );
+
+    SegmentReplicaCount zeroSegmentReplicaCount = 
Mockito.mock(SegmentReplicaCount.class);
+    SegmentReplicaCount nonZeroSegmentReplicaCount = 
Mockito.mock(SegmentReplicaCount.class);
+    Mockito.when(zeroSegmentReplicaCount.required()).thenReturn(0);
+    Mockito.when(nonZeroSegmentReplicaCount.required()).thenReturn(1);
+    SegmentReplicationStatus segmentReplicationStatus = 
Mockito.mock(SegmentReplicationStatus.class);
+    
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentAlpha.getId())))
+           .thenReturn(zeroSegmentReplicaCount);
+    
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentBeta.getId())))
+           .thenReturn(zeroSegmentReplicaCount);
+    
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(coldSegmentGamma.getId())))
+           .thenReturn(zeroSegmentReplicaCount);
+
+    
Mockito.when(segmentReplicationStatus.getReplicaCountsInCluster(ArgumentMatchers.eq(hotSegmentGamma.getId())))
+           .thenReturn(nonZeroSegmentReplicaCount);
+
+    schema.updateSegmentReplicationStatus(segmentReplicationStatus);
+
+    schema.coldDatasourceSchemaExec();
+    // alpha has only 1 cold segment
+    Assert.assertNotNull(schema.getDatasource("alpha"));
+    // gamma has both hot and cold segment
+    Assert.assertNotNull(schema.getDatasource("gamma"));
+    // assert that cold schema for gamma doesn't contain any columns from hot 
segment
+    RowSignature rowSignature = 
schema.getDatasource("gamma").getRowSignature();
+    Assert.assertTrue(rowSignature.contains("dim1"));
+    Assert.assertTrue(rowSignature.contains("c1"));
+    Assert.assertTrue(rowSignature.contains("c2"));
+    Assert.assertFalse(rowSignature.contains("c3"));
+    Assert.assertFalse(rowSignature.contains("c4"));
+
+    Assert.assertEquals(new HashSet<>(Arrays.asList("alpha", "gamma")), 
schema.getDataSourceInformationMap().keySet());
+
+    druidDataSources.clear();
+    druidDataSources.add(
+        new ImmutableDruidDataSource(
+            "beta",
+            Collections.emptyMap(),
+            Collections.singletonMap(coldSegmentBeta.getId(), coldSegmentBeta)
+        )
+    );
+
+    druidDataSources.add(
+        new ImmutableDruidDataSource(
+            "gamma",
+            Collections.emptyMap(),
+            Collections.singletonMap(hotSegmentGamma.getId(), hotSegmentGamma)
+        )
+    );
+
+    
Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments())
+           .thenReturn(druidDataSources);
+
+    schema.coldDatasourceSchemaExec();
+    Assert.assertNotNull(schema.getDatasource("beta"));
+    // alpha doesn't have any segments
+    Assert.assertNull(schema.getDatasource("alpha"));
+    // gamma just has 1 hot segment
+    Assert.assertNull(schema.getDatasource("gamma"));
+
+    Assert.assertNull(schema.getDatasource("doesnotexist"));
+
+    Assert.assertEquals(Collections.singleton("beta"), 
schema.getDataSourceInformationMap().keySet());
+  }
+
+  private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int 
columns)
   {
     final DataSourceInformation fooDs = schema.getDatasource("foo");
     final RowSignature fooRowSignature = fooDs.getRowSignature();
     List<String> columnNames = fooRowSignature.getColumnNames();
-    Assert.assertEquals(6, columnNames.size());
+    Assert.assertEquals(columns, columnNames.size());
 
     Assert.assertEquals("__time", columnNames.get(0));
     Assert.assertEquals(ColumnType.LONG, 
fooRowSignature.getColumnType(columnNames.get(0)).get());
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
index 7974ed460eb..628b6ea3978 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java
@@ -173,6 +173,16 @@ public class BrokerSegmentMetadataCache extends 
AbstractSegmentMetadataCache<Phy
     callbackExec.shutdownNow();
   }
 
+  /**
+   * Execute refresh on the broker in each cycle if 
CentralizedDatasourceSchema is enabled
+   * else if there are segments or datasources to be refreshed.
+   */
+  @Override
+  protected boolean shouldRefresh()
+  {
+    return centralizedDatasourceSchemaConfig.isEnabled() || 
super.shouldRefresh();
+  }
+
   /**
    * Refreshes the set of segments in two steps:
    * <ul>
@@ -196,6 +206,11 @@ public class BrokerSegmentMetadataCache extends 
AbstractSegmentMetadataCache<Phy
     // segmentMetadataInfo keys should be a superset of all other sets 
including datasources to refresh
     final Set<String> dataSourcesToQuery = new 
HashSet<>(segmentMetadataInfo.keySet());
 
+    // this is the complete set of datasources polled from the Coordinator
+    final Set<String> polledDatasources = queryDataSources();
+
+    dataSourcesToQuery.addAll(polledDatasources);
+
     log.debug("Querying schema for [%s] datasources from Coordinator.", 
dataSourcesToQuery);
 
     // Fetch datasource information from the Coordinator
@@ -227,14 +242,7 @@ public class BrokerSegmentMetadataCache extends 
AbstractSegmentMetadataCache<Phy
       // Remove those datasource for which we received schema from the 
Coordinator.
       dataSourcesToRebuild.removeAll(polledDataSourceMetadata.keySet());
 
-      if (centralizedDatasourceSchemaConfig.isEnabled()) {
-        // this is a hacky way to ensure refresh is executed even if there are 
no new segments to refresh
-        // once, CentralizedDatasourceSchema feature is GA, brokers should 
simply poll schema for all datasources
-        dataSourcesNeedingRebuild.addAll(segmentMetadataInfo.keySet());
-      } else {
-        dataSourcesNeedingRebuild.clear();
-      }
-      log.debug("DatasourcesNeedingRebuild are [%s]", 
dataSourcesNeedingRebuild);
+      dataSourcesNeedingRebuild.clear();
     }
 
     // Rebuild the datasources.
@@ -267,6 +275,23 @@ public class BrokerSegmentMetadataCache extends 
AbstractSegmentMetadataCache<Phy
     // noop, no additional action needed when segment is removed.
   }
 
+  private Set<String> queryDataSources()
+  {
+    Set<String> dataSources = new HashSet<>();
+
+    try {
+      Set<String> polled = 
FutureUtils.getUnchecked(coordinatorClient.fetchDataSourcesWithUsedSegments(), 
true);
+      if (polled != null) {
+        dataSources.addAll(polled);
+      }
+    }
+    catch (Exception e) {
+      log.debug(e, "Failed to query datasources from the Coordinator.");
+    }
+
+    return dataSources;
+  }
+
   private Map<String, PhysicalDatasourceMetadata> 
queryDataSourceInformation(Set<String> dataSourcesToQuery)
   {
     Stopwatch stopwatch = Stopwatch.createStarted();
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
index 23b2759286c..65610ce99f2 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
@@ -74,11 +74,13 @@ import org.apache.druid.server.security.NoopEscalator;
 import org.apache.druid.sql.calcite.table.DatasourceTable;
 import org.apache.druid.sql.calcite.table.DruidTable;
 import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.apache.druid.sql.calcite.util.TestTimelineServerView;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
+import org.joda.time.Period;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -332,6 +334,9 @@ public class BrokerSegmentMetadataCacheTest extends 
BrokerSegmentMetadataCacheTe
     ArgumentCaptor<Set<String>> argumentCaptor = 
ArgumentCaptor.forClass(Set.class);
     CoordinatorClient coordinatorClient = 
Mockito.mock(CoordinatorClient.class);
     
Mockito.when(coordinatorClient.fetchDataSourceInformation(argumentCaptor.capture())).thenReturn(Futures.immediateFuture(null));
+
+    Set<String> datsources = Sets.newHashSet(DATASOURCE1, DATASOURCE2, 
DATASOURCE3, SOME_DATASOURCE, "xyz", "coldDS");
+    
Mockito.when(coordinatorClient.fetchDataSourcesWithUsedSegments()).thenReturn(Futures.immediateFuture(datsources));
     BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         serverView,
@@ -347,7 +352,7 @@ public class BrokerSegmentMetadataCacheTest extends 
BrokerSegmentMetadataCacheTe
     schema.start();
     schema.awaitInitialization();
 
-    Assert.assertEquals(Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3, 
SOME_DATASOURCE), argumentCaptor.getValue());
+    Assert.assertEquals(datsources, argumentCaptor.getValue());
 
     refreshLatch = new CountDownLatch(1);
     serverView.addSegment(newSegment("xyz", 0), ServerType.HISTORICAL);
@@ -355,7 +360,87 @@ public class BrokerSegmentMetadataCacheTest extends 
BrokerSegmentMetadataCacheTe
     refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
 
     // verify that previously refreshed are included in the last coordinator 
poll
-    Assert.assertEquals(Sets.newHashSet(DATASOURCE1, DATASOURCE2, DATASOURCE3, 
SOME_DATASOURCE, "xyz"), argumentCaptor.getValue());
+    Assert.assertEquals(datsources, argumentCaptor.getValue());
+  }
+
+  @Test
+  public void testRefreshOnEachCycleCentralizedDatasourceSchemaEnabled() 
throws InterruptedException
+  {
+    CentralizedDatasourceSchemaConfig config = 
CentralizedDatasourceSchemaConfig.create();
+    config.setEnabled(true);
+
+    serverView = new TestTimelineServerView(walker.getSegments(), 
Collections.emptyList());
+    druidServers = serverView.getDruidServers();
+
+    BrokerSegmentMetadataCacheConfig metadataCacheConfig = 
BrokerSegmentMetadataCacheConfig.create("PT1S");
+    metadataCacheConfig.setMetadataRefreshPeriod(Period.parse("PT0.001S"));
+    BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
+        CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        new NoopServiceEmitter(),
+        new PhysicalDatasourceMetadataFactory(globalTableJoinable, 
segmentManager),
+        new NoopCoordinatorClient(),
+        config
+    ) {
+      @Override
+      public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> 
dataSourcesToRebuild)
+          throws IOException
+      {
+        super.refresh(segmentsToRefresh, dataSourcesToRebuild);
+        refreshLatch.countDown();
+      }
+    };
+
+    // refresh should be executed more than once, with the feature disabled 
refresh should be executed only once
+    refreshLatch = new CountDownLatch(3);
+    schema.start();
+    schema.awaitInitialization();
+
+    refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
+
+    Assert.assertEquals(0, refreshLatch.getCount());
+  }
+
+  @Test
+  public void testRefreshOnEachCycleCentralizedDatasourceSchemaDisabled() 
throws InterruptedException
+  {
+    BrokerSegmentMetadataCacheConfig metadataCacheConfig = 
BrokerSegmentMetadataCacheConfig.create("PT1S");
+    metadataCacheConfig.setMetadataRefreshPeriod(Period.parse("PT0.001S"));
+
+    serverView = new TestTimelineServerView(walker.getSegments(), 
Collections.emptyList());
+    druidServers = serverView.getDruidServers();
+
+    BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
+        CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+        serverView,
+        SEGMENT_CACHE_CONFIG_DEFAULT,
+        new NoopEscalator(),
+        new InternalQueryConfig(),
+        new NoopServiceEmitter(),
+        new PhysicalDatasourceMetadataFactory(globalTableJoinable, 
segmentManager),
+        new NoopCoordinatorClient(),
+        CentralizedDatasourceSchemaConfig.create()
+    ) {
+      @Override
+      public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> 
dataSourcesToRebuild)
+          throws IOException
+      {
+        super.refresh(segmentsToRefresh, dataSourcesToRebuild);
+        refreshLatch.countDown();
+      }
+    };
+
+    // refresh should be executed only once
+    refreshLatch = new CountDownLatch(3);
+    schema.start();
+    schema.awaitInitialization();
+
+    refreshLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
+
+    Assert.assertEquals(2, refreshLatch.getCount());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to