This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a26206  API to verify a datasource has the latest ingested data 
(#9965)
1a26206 is described below

commit 1a2620606d8187da7725088e9c52ce41b8a692b0
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Tue Jun 16 20:48:30 2020 -1000

    API to verify a datasource has the latest ingested data (#9965)
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * fix checksyle
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * fix spelling
    
    * address comments
    
    * fix checkstyle
    
    * update docs
    
    * fix tests
    
    * fix doc
    
    * address comments
    
    * fix typo
    
    * fix spelling
    
    * address comments
    
    * address comments
    
    * fix typo in docs
---
 docs/ingestion/faq.md                              |  14 +
 docs/operations/api-reference.md                   |  43 ++-
 .../apache/druid/client/CoordinatorServerView.java |   4 +
 .../druid/metadata/SegmentsMetadataManager.java    |  17 ++
 .../druid/metadata/SqlSegmentsMetadataManager.java | 142 ++++++++--
 .../druid/server/coordinator/DruidCoordinator.java |  19 +-
 .../druid/server/http/DataSourcesResource.java     | 133 ++++++++-
 .../metadata/SqlSegmentsMetadataManagerTest.java   | 159 ++++++++++-
 .../druid/server/http/DataSourcesResourceTest.java | 315 ++++++++++++++++++---
 9 files changed, 773 insertions(+), 73 deletions(-)

diff --git a/docs/ingestion/faq.md b/docs/ingestion/faq.md
index 1e6ffe1..308407f 100644
--- a/docs/ingestion/faq.md
+++ b/docs/ingestion/faq.md
@@ -66,6 +66,20 @@ Other common reasons that hand-off fails are as follows:
 
 Make sure to include the `druid-hdfs-storage` and all the hadoop 
configuration, dependencies (that can be obtained by running command `hadoop 
classpath` on a machine where hadoop has been setup) in the classpath. And, 
provide necessary HDFS settings as described in [deep 
storage](../dependencies/deep-storage.md) .
 
+## How do I know when I can make query to Druid after submitting batch 
ingestion task?
+
+You can verify if segments created by a recent ingestion task are loaded onto 
historicals and available for querying using the following workflow.
+1. Submit your ingestion task.
+2. Repeatedly poll the [Overlord's tasks 
API](../operations/api-reference.md#tasks) ( 
`/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be 
successfully completed.
+3. Poll the [Segment Loading by Datasource 
API](../operations/api-reference.md#segment-loading-by-datasource) 
(`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with 
+`forceMetadataRefresh=true` and `interval=<INTERVAL_OF_INGESTED_DATA>` once. 
+(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of 
all datasources. This can be a heavy operation in terms of the load on the 
metadata store but is necessary to make sure that we verify all the latest 
segments' load status)
+If there are segments not yet loaded, continue to step 4, otherwise you can 
now query the data.
+4. Repeatedly poll the [Segment Loading by Datasource 
API](../operations/api-reference.md#segment-loading-by-datasource) 
(`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with 
+`forceMetadataRefresh=false` and `interval=<INTERVAL_OF_INGESTED_DATA>`. 
+Continue polling until all segments are loaded. Once all segments are loaded 
you can now query the data. 
+Note that this workflow only guarantees that the segments are available at the 
time of the [Segment Loading by Datasource 
API](../operations/api-reference.md#segment-loading-by-datasource) call. 
Segments can still become missing because of historical process failures or any 
other reasons afterward.
+
 ## I don't see my Druid segments on my Historical processes
 
 You can check the Coordinator console located at `<COORDINATOR_IP>:<PORT>`. 
Make sure that your segments have actually loaded on [Historical 
processes](../design/historical.md). If your segments are not present, check 
the Coordinator logs for messages about capacity of replication errors. One 
reason that segments are not downloaded is because Historical processes have 
maxSizes that are too small, making them incapable of downloading more data. 
You can change that with (for example):
diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md
index a66dd64..a3610a8 100644
--- a/docs/operations/api-reference.md
+++ b/docs/operations/api-reference.md
@@ -96,11 +96,11 @@ Returns the percentage of segments actually loaded in the 
cluster versus segment
 
  * `/druid/coordinator/v1/loadstatus?simple`
 
-Returns the number of segments left to load until segments that should be 
loaded in the cluster are available for queries. This does not include 
replication.
+Returns the number of segments left to load until segments that should be 
loaded in the cluster are available for queries. This does not include segment 
replication counts.
 
 * `/druid/coordinator/v1/loadstatus?full`
 
-Returns the number of segments left to load in each tier until segments that 
should be loaded in the cluster are all available. This includes replication.
+Returns the number of segments left to load in each tier until segments that 
should be loaded in the cluster are all available. This includes segment 
replication counts.
 
 * `/druid/coordinator/v1/loadqueue`
 
@@ -114,6 +114,45 @@ Returns the number of segments to load and drop, as well 
as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical 
process.
 
+
+#### Segment Loading by Datasource
+
+Note that all _interval_ query parameters are ISO 8601 strings (e.g., 
2016-06-27/2016-06-28).
+Also note that these APIs only guarantees that the segments are available at 
the time of the call. 
+Segments can still become missing because of historical process failures or 
any other reasons afterward.
+
+##### GET
+
+* 
`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus 
segments that should be loaded in the cluster for the given 
+datasource over the given interval (or last 2 weeks if interval is not given). 
`forceMetadataRefresh` is required to be set. 
+Setting `forceMetadataRefresh` to true will force the coordinator to poll 
latest segment metadata from the metadata store 
+(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of 
all datasources. This can be a heavy operation in terms 
+of the load on the metadata store but can be necessary to make sure that we 
verify all the latest segments' load status)
+Setting `forceMetadataRefresh` to false will use the metadata cached on the 
coordinator from the last force/periodic refresh. 
+If no used segments are found for the given inputs, this API returns `204 No 
Content`
+
+ * 
`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the number of segments left to load until segments that should be 
loaded in the cluster are available for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). This does 
not include segment replication counts. `forceMetadataRefresh` is required to 
be set. 
+Setting `forceMetadataRefresh` to true will force the coordinator to poll 
latest segment metadata from the metadata store 
+(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of 
all datasources. This can be a heavy operation in terms 
+of the load on the metadata store but can be necessary to make sure that we 
verify all the latest segments' load status)
+Setting `forceMetadataRefresh` to false will use the metadata cached on the 
coordinator from the last force/periodic refresh. 
+If no used segments are found for the given inputs, this API returns `204 No 
Content` 
+
+* 
`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the number of segments left to load in each tier until segments that 
should be loaded in the cluster are all available for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). This 
includes segment replication counts. `forceMetadataRefresh` is required to be 
set. 
+Setting `forceMetadataRefresh` to true will force the coordinator to poll 
latest segment metadata from the metadata store 
+(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of 
all datasources. This can be a heavy operation in terms 
+of the load on the metadata store but can be necessary to make sure that we 
verify all the latest segments' load status)
+Setting `forceMetadataRefresh` to false will use the metadata cached on the 
coordinator from the last force/periodic refresh. 
+If no used segments are found for the given inputs, this API returns `204 No 
Content`
+
 #### Metadata store information
 
 ##### GET
diff --git 
a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java 
b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
index 2517a8f..538cc2f 100644
--- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
+++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
@@ -200,6 +200,10 @@ public class CoordinatorServerView implements InventoryView
     }
   }
 
+  public Map<SegmentId, SegmentLoadInfo> getSegmentLoadInfos()
+  {
+    return segmentLoadInfos;
+  }
 
   @Override
   public DruidServer getInventoryValue(String serverKey)
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java 
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
index 4f97b15..889141a 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
@@ -20,6 +20,7 @@
 package org.apache.druid.metadata;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.timeline.DataSegment;
@@ -114,6 +115,22 @@ public interface SegmentsMetadataManager
   Iterable<DataSegment> iterateAllUsedSegments();
 
   /**
+   * Returns an iterable to go over all used and non-overshadowed segments of 
given data sources over given interval.
+   * The order in which segments are iterated is unspecified. Note: the 
iteration may not be as trivially cheap as,
+   * for example, iteration over an ArrayList. Try (to some reasonable extent) 
to organize the code so that it
+   * iterates the returned iterable only once rather than several times.
+   * If {@param requiresLatest} is true then a force metadatastore poll will 
be triggered. This can cause a longer
+   * response time but will ensure that the latest segment information (at the 
time this method is called) is returned.
+   * If {@param requiresLatest} is false then segment information from stale 
snapshot of up to the last periodic poll
+   * period {@link SqlSegmentsMetadataManager#periodicPollDelay} will be used.
+   */
+  Optional<Iterable<DataSegment>> 
iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+      String datasource,
+      Interval interval,
+      boolean requiresLatest
+  );
+
+  /**
    * Retrieves all data source names for which there are segment in the 
database, regardless of whether those segments
    * are used or not. If there are no segments in the database, returns an 
empty set.
    *
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 92a8748..60f7f4b 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -20,6 +20,8 @@
 package org.apache.druid.metadata;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
@@ -44,6 +46,7 @@ import 
org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.Partitions;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -95,7 +98,8 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
   {}
 
   /** Represents periodic {@link #poll}s happening from {@link #exec}. */
-  private static class PeriodicDatabasePoll implements DatabasePoll
+  @VisibleForTesting
+  static class PeriodicDatabasePoll implements DatabasePoll
   {
     /**
      * This future allows to wait until {@link #dataSourcesSnapshot} is 
initialized in the first {@link #poll()}
@@ -104,13 +108,15 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
      * leadership changes.
      */
     final CompletableFuture<Void> firstPollCompletionFuture = new 
CompletableFuture<>();
+    long lastPollStartTimestampInMs = -1;
   }
 
   /**
    * Represents on-demand {@link #poll} initiated at periods of time when 
SqlSegmentsMetadataManager doesn't poll the database
    * periodically.
    */
-  private static class OnDemandDatabasePoll implements DatabasePoll
+  @VisibleForTesting
+  static class OnDemandDatabasePoll implements DatabasePoll
   {
     final long initiationTimeNanos = System.nanoTime();
     final CompletableFuture<Void> pollCompletionFuture = new 
CompletableFuture<>();
@@ -127,7 +133,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
    * called at the same time if two different threads are calling them. This 
might be possible if Coordinator gets and
    * drops leadership repeatedly in quick succession.
    *
-   * This lock is also used to synchronize {@link #awaitOrPerformDatabasePoll} 
for times when SqlSegmentsMetadataManager
+   * This lock is also used to synchronize {@link 
#useLatestIfWithinDelayOrPerformNewDatabasePoll} for times when 
SqlSegmentsMetadataManager
    * is not polling the database periodically (in other words, when the 
Coordinator is not the leader).
    */
   private final ReentrantReadWriteLock startStopPollLock = new 
ReentrantReadWriteLock();
@@ -155,7 +161,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
    * easy to forget to do.
    *
    * This field may be updated from {@link #exec}, or from whatever thread 
calling {@link #doOnDemandPoll} via {@link
-   * #awaitOrPerformDatabasePoll()} via one of the public methods of 
SqlSegmentsMetadataManager.
+   * #useLatestIfWithinDelayOrPerformNewDatabasePoll()} via one of the public 
methods of SqlSegmentsMetadataManager.
    */
   private volatile @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = 
null;
 
@@ -170,7 +176,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
    * Note that if there is a happens-before relationship between a call to 
{@link #startPollingDatabasePeriodically()}
    * (on Coordinators' leadership change) and one of the methods accessing the 
{@link #dataSourcesSnapshot}'s state in
    * this class the latter is guaranteed to await for the initiated periodic 
poll. This is because when the latter
-   * method calls to {@link #awaitLatestDatabasePoll()} via {@link 
#awaitOrPerformDatabasePoll}, they will
+   * method calls to {@link #useLatestSnapshotIfWithinDelay()} via {@link 
#useLatestIfWithinDelayOrPerformNewDatabasePoll}, they will
    * see the latest {@link PeriodicDatabasePoll} value (stored in this field, 
latestDatabasePoll, in {@link
    * #startPollingDatabasePeriodically()}) and to await on its {@link 
PeriodicDatabasePoll#firstPollCompletionFuture}.
    *
@@ -185,7 +191,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
    * SegmentsMetadataManager} and guarantee that it always returns consistent 
and relatively up-to-date data from methods
    * like {@link #getImmutableDataSourceWithUsedSegments}, while avoiding 
excessive repetitive polls. The last part
    * is achieved via "hooking on" other polls by awaiting on {@link 
PeriodicDatabasePoll#firstPollCompletionFuture} or
-   * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link 
#awaitOrPerformDatabasePoll} method
+   * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link 
#useLatestIfWithinDelayOrPerformNewDatabasePoll} method
    * implementation for details.
    *
    * Note: the overall implementation of periodic/on-demand polls is not 
completely optimal: for example, when the
@@ -194,7 +200,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
    * during Coordinator leadership switches is not a priority.
    *
    * This field is {@code volatile} because it's checked and updated in a 
double-checked locking manner in {@link
-   * #awaitOrPerformDatabasePoll()}.
+   * #useLatestIfWithinDelayOrPerformNewDatabasePoll()}.
    */
   private volatile @Nullable DatabasePoll latestDatabasePoll = null;
 
@@ -311,6 +317,22 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
   private Runnable createPollTaskForStartOrder(long startOrder, 
PeriodicDatabasePoll periodicDatabasePoll)
   {
     return () -> {
+      // If latest poll was an OnDemandDatabasePoll that started less than 
periodicPollDelay,
+      // We will wait for (periodicPollDelay - currentTime - 
LatestOnDemandDatabasePollStartTime) then check again.
+      try {
+        long periodicPollDelayNanos = 
TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis());
+        while (latestDatabasePoll != null
+               && latestDatabasePoll instanceof OnDemandDatabasePoll
+               && ((OnDemandDatabasePoll) 
latestDatabasePoll).nanosElapsedFromInitiation() < periodicPollDelayNanos) {
+          long sleepNano = periodicPollDelayNanos
+                           - ((OnDemandDatabasePoll) 
latestDatabasePoll).nanosElapsedFromInitiation();
+          TimeUnit.NANOSECONDS.sleep(sleepNano);
+        }
+      }
+      catch (Exception e) {
+        log.debug(e, "Exception found while waiting for next periodic poll");
+      }
+
       // poll() is synchronized together with 
startPollingDatabasePeriodically(), stopPollingDatabasePeriodically() and
       // isPollingDatabasePeriodically() to ensure that when 
stopPollingDatabasePeriodically() exits, poll() won't
       // actually run anymore after that (it could only enter the synchronized 
section and exit immediately because the
@@ -320,8 +342,10 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
       lock.lock();
       try {
         if (startOrder == currentStartPollingOrder) {
+          periodicDatabasePoll.lastPollStartTimestampInMs = 
System.currentTimeMillis();
           poll();
           periodicDatabasePoll.firstPollCompletionFuture.complete(null);
+          latestDatabasePoll = periodicDatabasePoll;
         } else {
           log.debug("startOrder = currentStartPollingOrder = %d, skipping 
poll()", startOrder);
         }
@@ -381,16 +405,16 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
     }
   }
 
-  private void awaitOrPerformDatabasePoll()
+  private void useLatestIfWithinDelayOrPerformNewDatabasePoll()
   {
-    // Double-checked locking with awaitLatestDatabasePoll() call playing the 
role of the "check".
-    if (awaitLatestDatabasePoll()) {
+    // Double-checked locking with useLatestSnapshotIfWithinDelay() call 
playing the role of the "check".
+    if (useLatestSnapshotIfWithinDelay()) {
       return;
     }
     ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
     lock.lock();
     try {
-      if (awaitLatestDatabasePoll()) {
+      if (useLatestSnapshotIfWithinDelay()) {
         return;
       }
       OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll();
@@ -403,11 +427,17 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
   }
 
   /**
-   * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or 
an {@link OnDemandDatabasePoll} that is
-   * made not longer than {@link #periodicPollDelay} from now, awaits for it 
and returns true; returns false otherwise,
-   * meaning that a new on-demand database poll should be initiated.
+   * This method returns true without waiting for database poll if the latest 
{@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has completed it's first poll, or an 
{@link OnDemandDatabasePoll} that is
+   * made not longer than {@link #periodicPollDelay} from current time.
+   * This method does wait untill completion for if the latest {@link 
DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has not completed it's first poll, or 
an {@link OnDemandDatabasePoll} that is
+   * already in the process of polling the database.
+   * This means that any method using this check can read from snapshot that is
+   * up to {@link SqlSegmentsMetadataManager#periodicPollDelay} old.
    */
-  private boolean awaitLatestDatabasePoll()
+  @VisibleForTesting
+  boolean useLatestSnapshotIfWithinDelay()
   {
     DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
     if (latestDatabasePoll instanceof PeriodicDatabasePoll) {
@@ -430,6 +460,49 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
     return false;
   }
 
+  /**
+   * This method will always force a database poll if there is no ongoing 
database poll. This method will then
+   * waits for the new poll or the ongoing poll to completes before returning.
+   * This means that any method using this check can be sure that the latest 
poll for the snapshot was completed after
+   * this method was called.
+   */
+  @VisibleForTesting
+  void forceOrWaitOngoingDatabasePoll()
+  {
+    long checkStartTime = System.currentTimeMillis();
+    ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+    lock.lock();
+    try {
+      DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
+      try {
+        //Verify if there was a periodic poll completed while we were waiting 
for the lock
+        if (latestDatabasePoll instanceof PeriodicDatabasePoll
+            && ((PeriodicDatabasePoll) 
latestDatabasePoll).lastPollStartTimestampInMs > checkStartTime) {
+          return;
+        }
+        // Verify if there was a on-demand poll completed while we were 
waiting for the lock
+        if (latestDatabasePoll instanceof OnDemandDatabasePoll) {
+          long checkStartTimeNanos = 
TimeUnit.MILLISECONDS.toNanos(checkStartTime);
+          OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll) 
latestDatabasePoll;
+          if (latestOnDemandPoll.initiationTimeNanos > checkStartTimeNanos) {
+            return;
+          }
+        }
+      }
+      catch (Exception e) {
+        // Latest poll was unsuccessful, try to do a new poll
+        log.debug(e, "Latest poll was unsuccessful. Starting a new poll...");
+      }
+      // Force a database poll
+      OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll();
+      this.latestDatabasePoll = onDemandDatabasePoll;
+      doOnDemandPoll(onDemandDatabasePoll);
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
   private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll)
   {
     try {
@@ -857,19 +930,44 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
   @Override
   public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments()
   {
-    awaitOrPerformDatabasePoll();
+    useLatestIfWithinDelayOrPerformNewDatabasePoll();
     return dataSourcesSnapshot;
   }
 
+  @VisibleForTesting
+  DataSourcesSnapshot getDataSourcesSnapshot()
+  {
+    return dataSourcesSnapshot;
+  }
+
+  @VisibleForTesting
+  DatabasePoll getLatestDatabasePoll()
+  {
+    return latestDatabasePoll;
+  }
+
+
   @Override
   public Iterable<DataSegment> iterateAllUsedSegments()
   {
-    awaitOrPerformDatabasePoll();
-    return () -> dataSourcesSnapshot
-        .getDataSourcesWithAllUsedSegments()
-        .stream()
-        .flatMap(dataSource -> dataSource.getSegments().stream())
-        .iterator();
+    useLatestIfWithinDelayOrPerformNewDatabasePoll();
+    return dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot();
+  }
+
+  @Override
+  public Optional<Iterable<DataSegment>> 
iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource,
+                                                                               
                     Interval interval,
+                                                                               
                     boolean requiresLatest)
+  {
+    if (requiresLatest) {
+      forceOrWaitOngoingDatabasePoll();
+    } else {
+      useLatestIfWithinDelayOrPerformNewDatabasePoll();
+    }
+    VersionedIntervalTimeline<String, DataSegment> usedSegmentsTimeline
+        = 
dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource);
+    return Optional.fromNullable(usedSegmentsTimeline)
+                   .transform(timeline -> 
timeline.findNonOvershadowedObjectsInInterval(interval, 
Partitions.ONLY_COMPLETE));
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 36a414e..c4de364 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -257,14 +257,23 @@ public class DruidCoordinator
    */
   public Map<String, Object2LongMap<String>> 
computeUnderReplicationCountsPerDataSourcePerTier()
   {
+    final Iterable<DataSegment> dataSegments = 
segmentsMetadataManager.iterateAllUsedSegments();
+    return 
computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments);
+  }
+
+  /**
+   * @return tier -> { dataSource -> underReplicationCount } map
+   */
+  public Map<String, Object2LongMap<String>> 
computeUnderReplicationCountsPerDataSourcePerTierForSegments(
+      Iterable<DataSegment> dataSegments
+  )
+  {
     final Map<String, Object2LongMap<String>> 
underReplicationCountsPerDataSourcePerTier = new HashMap<>();
 
     if (segmentReplicantLookup == null) {
       return underReplicationCountsPerDataSourcePerTier;
     }
 
-    final Iterable<DataSegment> dataSegments = 
segmentsMetadataManager.iterateAllUsedSegments();
-
     final DateTime now = DateTimes.nowUtc();
 
     for (final DataSegment segment : dataSegments) {
@@ -320,7 +329,7 @@ public class DruidCoordinator
 
     for (ImmutableDruidDataSource dataSource : dataSources) {
       final Set<DataSegment> segments = 
Sets.newHashSet(dataSource.getSegments());
-      final int numUsedSegments = segments.size();
+      final int numPublishedSegments = segments.size();
 
       // remove loaded segments
       for (DruidServer druidServer : serverInventoryView.getInventory()) {
@@ -333,10 +342,10 @@ public class DruidCoordinator
           }
         }
       }
-      final int numUnloadedSegments = segments.size();
+      final int numUnavailableSegments = segments.size();
       loadStatus.put(
           dataSource.getName(),
-          100 * ((double) (numUsedSegments - numUnloadedSegments) / (double) 
numUsedSegments)
+          100 * ((double) (numPublishedSegments - numUnavailableSegments) / 
(double) numPublishedSegments)
       );
     }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java 
b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index b6d310f..f88d1c7 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -22,11 +22,13 @@ package org.apache.druid.server.http;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 import com.sun.jersey.spi.container.ResourceFilters;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.druid.client.CoordinatorServerView;
 import org.apache.druid.client.DruidDataSource;
@@ -49,6 +51,7 @@ import org.apache.druid.metadata.UnknownSegmentIdsException;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.rules.LoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.server.http.security.DatasourceResourceFilter;
@@ -96,12 +99,14 @@ import java.util.stream.Collectors;
 public class DataSourcesResource
 {
   private static final Logger log = new Logger(DataSourcesResource.class);
+  private static final long DEFAULT_LOADSTATUS_INTERVAL_OFFSET = 14 * 24 * 60 
* 60 * 1000;
 
   private final CoordinatorServerView serverInventoryView;
   private final SegmentsMetadataManager segmentsMetadataManager;
   private final MetadataRuleManager metadataRuleManager;
   private final IndexingServiceClient indexingServiceClient;
   private final AuthorizerMapper authorizerMapper;
+  private final DruidCoordinator coordinator;
 
   @Inject
   public DataSourcesResource(
@@ -109,7 +114,8 @@ public class DataSourcesResource
       SegmentsMetadataManager segmentsMetadataManager,
       MetadataRuleManager metadataRuleManager,
       @Nullable IndexingServiceClient indexingServiceClient,
-      AuthorizerMapper authorizerMapper
+      AuthorizerMapper authorizerMapper,
+      DruidCoordinator coordinator
   )
   {
     this.serverInventoryView = serverInventoryView;
@@ -117,6 +123,7 @@ public class DataSourcesResource
     this.metadataRuleManager = metadataRuleManager;
     this.indexingServiceClient = indexingServiceClient;
     this.authorizerMapper = authorizerMapper;
+    this.coordinator = coordinator;
   }
 
   @GET
@@ -391,6 +398,130 @@ public class DataSourcesResource
     return getServedSegmentsInInterval(dataSourceName, full != null, 
theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("forceMetadataRefresh") final Boolean forceMetadataRefresh,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    if (forceMetadataRefresh == null) {
+      return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("Invalid request. forceMetadataRefresh must be specified")
+          .build();
+    }
+    final Interval theInterval;
+    if (interval == null) {
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - 
DEFAULT_LOADSTATUS_INTERVAL_OFFSET, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    Optional<Iterable<DataSegment>> segments = 
segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        forceMetadataRefresh
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (Iterables.size(segments.get()) == 0) {
+      return Response
+          .status(Response.Status.NO_CONTENT)
+          .entity("No used segment found for the given datasource and 
interval")
+          .build();
+    }
+
+    if (simple != null) {
+      // Calculate response for simple mode
+      SegmentsLoadStatistics segmentsLoadStatistics = 
computeSegmentLoadStatistics(segments.get());
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              segmentsLoadStatistics.getNumUnavailableSegments()
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate response for full mode
+      Map<String, Object2LongMap<String>> segmentLoadMap
+          = 
coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get());
+      if (segmentLoadMap.isEmpty()) {
+        return Response.serverError()
+                       .entity("Coordinator segment replicant lookup is not 
initialized yet. Try again later.")
+                       .build();
+      }
+      return Response.ok(segmentLoadMap).build();
+    } else {
+      // Calculate response for default mode
+      SegmentsLoadStatistics segmentsLoadStatistics = 
computeSegmentLoadStatistics(segments.get());
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments()) 
/ (double) segmentsLoadStatistics.getNumPublishedSegments())
+          )
+      ).build();
+    }
+  }
+
+  private SegmentsLoadStatistics 
computeSegmentLoadStatistics(Iterable<DataSegment> segments)
+  {
+    Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = 
serverInventoryView.getSegmentLoadInfos();
+    int numPublishedSegments = 0;
+    int numUnavailableSegments = 0;
+    int numLoadedSegments = 0;
+    for (DataSegment segment : segments) {
+      numPublishedSegments++;
+      if (!segmentLoadInfos.containsKey(segment.getId())) {
+        numUnavailableSegments++;
+      } else {
+        numLoadedSegments++;
+      }
+    }
+    return new SegmentsLoadStatistics(numPublishedSegments, 
numUnavailableSegments, numLoadedSegments);
+  }
+
+  private static class SegmentsLoadStatistics
+  {
+    private int numPublishedSegments;
+    private int numUnavailableSegments;
+    private int numLoadedSegments;
+
+    SegmentsLoadStatistics(
+        int numPublishedSegments,
+        int numUnavailableSegments,
+        int numLoadedSegments
+    )
+    {
+      this.numPublishedSegments = numPublishedSegments;
+      this.numUnavailableSegments = numUnavailableSegments;
+      this.numLoadedSegments = numLoadedSegments;
+    }
+
+    public int getNumPublishedSegments()
+    {
+      return numPublishedSegments;
+    }
+
+    public int getNumUnavailableSegments()
+    {
+      return numUnavailableSegments;
+    }
+
+    public int getNumLoadedSegments()
+    {
+      return numLoadedSegments;
+    }
+  }
+
   /**
    * The property names belong to the public HTTP JSON API.
    */
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
index be6354a..57df440 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
@@ -20,11 +20,13 @@
 package org.apache.druid.metadata;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
@@ -43,6 +45,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 
@@ -117,7 +120,7 @@ public class SqlSegmentsMetadataManagerTest
   {
     TestDerbyConnector connector = derbyConnectorRule.getConnector();
     SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig();
-    config.setPollDuration(Period.seconds(1));
+    config.setPollDuration(Period.seconds(3));
     sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
         jsonMapper,
         Suppliers.ofInstance(config),
@@ -148,30 +151,124 @@ public class SqlSegmentsMetadataManagerTest
   }
 
   @Test
-  public void testPoll()
+  public void testPollPeriodically()
   {
+    DataSourcesSnapshot dataSourcesSnapshot = 
sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertNull(dataSourcesSnapshot);
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
-    sqlSegmentsMetadataManager.poll();
     
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+    // This call make sure that the first poll is completed
+    sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay();
+    Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() 
instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
+    dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
     Assert.assertEquals(
         ImmutableSet.of("wikipedia"),
         sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
     );
     Assert.assertEquals(
         ImmutableList.of("wikipedia"),
-        sqlSegmentsMetadataManager
-            .getImmutableDataSourcesWithAllUsedSegments()
-            .stream()
-            .map(ImmutableDruidDataSource::getName)
-            .collect(Collectors.toList())
+        dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
+                           .stream()
+                           .map(ImmutableDruidDataSource::getName)
+                           .collect(Collectors.toList())
     );
     Assert.assertEquals(
         ImmutableSet.of(segment1, segment2),
-        
ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia").getSegments())
+        
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments())
     );
     Assert.assertEquals(
         ImmutableSet.of(segment1, segment2),
-        
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
+        
ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
+    );
+  }
+
+  @Test
+  public void testPollOnDemand()
+  {
+    DataSourcesSnapshot dataSourcesSnapshot = 
sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertNull(dataSourcesSnapshot);
+    // This should return false and not wait/poll anything as we did not 
schedule periodic poll
+    
Assert.assertFalse(sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay());
+    Assert.assertNull(dataSourcesSnapshot);
+    // This call will force on demand poll
+    sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll();
+    
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+    Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() 
instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll);
+    dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertEquals(
+        ImmutableSet.of("wikipedia"),
+        sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
+    );
+    Assert.assertEquals(
+        ImmutableList.of("wikipedia"),
+        dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
+                           .stream()
+                           .map(ImmutableDruidDataSource::getName)
+                           .collect(Collectors.toList())
+    );
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2),
+        
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments())
+    );
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2),
+        
ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
+    );
+  }
+
+  @Test(timeout = 60_000)
+  public void testPollPeriodicallyAndOnDemandInterleave() throws Exception
+  {
+    DataSourcesSnapshot dataSourcesSnapshot = 
sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertNull(dataSourcesSnapshot);
+    sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
+    
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+    // This call make sure that the first poll is completed
+    sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay();
+    Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() 
instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
+    dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertEquals(
+        ImmutableList.of("wikipedia"),
+        dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
+                           .stream()
+                           .map(ImmutableDruidDataSource::getName)
+                           .collect(Collectors.toList())
+    );
+    final String newDataSource2 = "wikipedia2";
+    final DataSegment newSegment2 = createNewSegment1(newDataSource2);
+    publisher.publishSegment(newSegment2);
+
+    // This call will force on demand poll
+    sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll();
+    
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+    Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() 
instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll);
+    // New datasource should now be in the snapshot since we just force on 
demand poll.
+    dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertEquals(
+        ImmutableList.of("wikipedia2", "wikipedia"),
+        dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
+                           .stream()
+                           .map(ImmutableDruidDataSource::getName)
+                           .collect(Collectors.toList())
+    );
+
+    final String newDataSource3 = "wikipedia3";
+    final DataSegment newSegment3 = createNewSegment1(newDataSource3);
+    publisher.publishSegment(newSegment3);
+
+    // This time wait for periodic poll (not doing on demand poll so we have 
to wait a bit...)
+    while 
(sqlSegmentsMetadataManager.getDataSourcesSnapshot().getDataSource(newDataSource3)
 == null) {
+      Thread.sleep(1000);
+    }
+    
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+    Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() 
instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
+    dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertEquals(
+        ImmutableList.of("wikipedia2", "wikipedia3", "wikipedia"),
+        dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
+                           .stream()
+                           .map(ImmutableDruidDataSource::getName)
+                           .collect(Collectors.toList())
     );
   }
 
@@ -749,4 +846,46 @@ public class SqlSegmentsMetadataManagerTest
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
   }
+
+  @Test
+  public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval() 
throws Exception
+  {
+    final Interval theInterval = 
Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000");
+    Optional<Iterable<DataSegment>> segments = 
sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        "wikipedia", theInterval, true
+    );
+    Assert.assertTrue(segments.isPresent());
+    Set<DataSegment> dataSegmentSet = ImmutableSet.copyOf(segments.get());
+    Assert.assertEquals(1, dataSegmentSet.size());
+    Assert.assertTrue(dataSegmentSet.contains(segment1));
+
+    final DataSegment newSegment2 = createSegment(
+        "wikipedia",
+        "2012-03-16T00:00:00.000/2012-03-17T00:00:00.000",
+        "2017-10-15T20:19:12.565Z",
+        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
+        0
+    );
+    publisher.publishSegment(newSegment2);
+
+    // New segment is not returned since we call without force poll
+    segments = 
sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        "wikipedia", theInterval, false
+    );
+    Assert.assertTrue(segments.isPresent());
+    dataSegmentSet = ImmutableSet.copyOf(segments.get());
+    Assert.assertEquals(1, dataSegmentSet.size());
+    Assert.assertTrue(dataSegmentSet.contains(segment1));
+
+    // New segment is returned since we call with force poll
+    segments = 
sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        "wikipedia", theInterval, true
+    );
+    Assert.assertTrue(segments.isPresent());
+    dataSegmentSet = ImmutableSet.copyOf(segments.get());
+    Assert.assertEquals(2, dataSegmentSet.size());
+    Assert.assertTrue(dataSegmentSet.contains(segment1));
+    Assert.assertTrue(dataSegmentSet.contains(newSegment2));
+  }
+
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
 
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
index 10a7f97..39e02ae 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
@@ -19,11 +19,14 @@
 
 package org.apache.druid.server.http;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import org.apache.druid.client.CoordinatorServerView;
 import org.apache.druid.client.DruidDataSource;
 import org.apache.druid.client.DruidServer;
@@ -39,6 +42,7 @@ import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.rules.IntervalDropRule;
 import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
@@ -176,7 +180,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView, server, request);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, 
AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+        new DataSourcesResource(inventoryView, null, null, null, 
AuthTestUtils.TEST_AUTHORIZER_MAPPER, null);
     Response response = dataSourcesResource.getQueryableDataSources("full", 
null, request);
     Set<ImmutableDruidDataSource> result = (Set<ImmutableDruidDataSource>) 
response.getEntity();
     Assert.assertEquals(200, response.getStatus());
@@ -250,7 +254,7 @@ public class DataSourcesResourceTest
       }
     };
 
-    DataSourcesResource dataSourcesResource = new 
DataSourcesResource(inventoryView, null, null, null, authMapper);
+    DataSourcesResource dataSourcesResource = new 
DataSourcesResource(inventoryView, null, null, null, authMapper, null);
     Response response = dataSourcesResource.getQueryableDataSources("full", 
null, request);
     Set<ImmutableDruidDataSource> result = (Set<ImmutableDruidDataSource>) 
response.getEntity();
 
@@ -289,7 +293,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView, server, request);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, 
AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+        new DataSourcesResource(inventoryView, null, null, null, 
AuthTestUtils.TEST_AUTHORIZER_MAPPER, null);
     Response response = dataSourcesResource.getQueryableDataSources(null, 
"simple", request);
     Assert.assertEquals(200, response.getStatus());
     List<Map<String, Object>> results = (List<Map<String, Object>>) 
response.getEntity();
@@ -313,7 +317,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView, server);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, null);
+        new DataSourcesResource(inventoryView, null, null, null, null, null);
     Response response = dataSourcesResource.getDataSource("datasource1", 
"full");
     ImmutableDruidDataSource result = (ImmutableDruidDataSource) 
response.getEntity();
     Assert.assertEquals(200, response.getStatus());
@@ -329,7 +333,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView, server);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, null);
+        new DataSourcesResource(inventoryView, null, null, null, null, null);
     Assert.assertEquals(204, dataSourcesResource.getDataSource("none", 
null).getStatus());
     EasyMock.verify(inventoryView, server);
   }
@@ -347,7 +351,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView, server);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, null);
+        new DataSourcesResource(inventoryView, null, null, null, null, null);
     Response response = dataSourcesResource.getDataSource("datasource1", null);
     Assert.assertEquals(200, response.getStatus());
     Map<String, Map<String, Object>> result = (Map<String, Map<String, 
Object>>) response.getEntity();
@@ -380,7 +384,7 @@ public class DataSourcesResourceTest
     
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server,
 server2, server3)).atLeastOnce();
 
     EasyMock.replay(inventoryView, server, server2, server3);
-    DataSourcesResource dataSourcesResource = new 
DataSourcesResource(inventoryView, null, null, null, null);
+    DataSourcesResource dataSourcesResource = new 
DataSourcesResource(inventoryView, null, null, null, null, null);
     Response response = dataSourcesResource.getDataSource("datasource1", null);
     Assert.assertEquals(200, response.getStatus());
     Map<String, Map<String, Object>> result = (Map<String, Map<String, 
Object>>) response.getEntity();
@@ -418,7 +422,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView);
 
-    DataSourcesResource dataSourcesResource = new 
DataSourcesResource(inventoryView, null, null, null, null);
+    DataSourcesResource dataSourcesResource = new 
DataSourcesResource(inventoryView, null, null, null, null, null);
     Response response = dataSourcesResource.getDataSource("datasource1", null);
     Assert.assertEquals(200, response.getStatus());
     Map<String, Map<String, Object>> result1 = (Map<String, Map<String, 
Object>>) response.getEntity();
@@ -463,7 +467,7 @@ public class DataSourcesResourceTest
     
expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z"));
     
expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z"));
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, null);
+        new DataSourcesResource(inventoryView, null, null, null, null, null);
 
     Response response = 
dataSourcesResource.getIntervalsWithServedSegmentsOrAllServedSegmentsPerIntervals(
         "invalidDataSource",
@@ -523,7 +527,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(inventoryView);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, null);
+        new DataSourcesResource(inventoryView, null, null, null, null, null);
     Response response = dataSourcesResource.getServedSegmentsInInterval(
         "invalidDataSource",
         "2010-01-01/P1D",
@@ -593,7 +597,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(indexingServiceClient, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, 
indexingServiceClient, null);
+        new DataSourcesResource(inventoryView, null, null, 
indexingServiceClient, null, null);
     Response response = 
dataSourcesResource.killUnusedSegmentsInInterval("datasource1", interval);
 
     Assert.assertEquals(200, response.getStatus());
@@ -607,7 +611,7 @@ public class DataSourcesResourceTest
     IndexingServiceClient indexingServiceClient = 
EasyMock.createStrictMock(IndexingServiceClient.class);
     EasyMock.replay(indexingServiceClient, server);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, 
indexingServiceClient, null);
+        new DataSourcesResource(inventoryView, null, null, 
indexingServiceClient, null, null);
     try {
       Response response =
           
dataSourcesResource.markAsUnusedAllSegmentsOrKillUnusedSegmentsInInterval("datasource",
 "true", "???");
@@ -630,7 +634,7 @@ public class DataSourcesResourceTest
     Rule loadRule = new 
IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), 
null);
     Rule dropRule = new 
IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z"));
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, databaseRuleManager, 
null, null);
+        new DataSourcesResource(inventoryView, null, databaseRuleManager, 
null, null, null);
 
     // test dropped
     EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
@@ -699,7 +703,7 @@ public class DataSourcesResourceTest
     
EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(true).once();
     EasyMock.replay(segmentsMetadataManager);
 
-    DataSourcesResource dataSourcesResource = new DataSourcesResource(null, 
segmentsMetadataManager, null, null, null);
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(null, 
segmentsMetadataManager, null, null, null, null);
 
     Response response = 
dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), 
segment.getId().toString());
     Assert.assertEquals(200, response.getStatus());
@@ -713,7 +717,7 @@ public class DataSourcesResourceTest
     
EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(false).once();
     EasyMock.replay(segmentsMetadataManager);
 
-    DataSourcesResource dataSourcesResource = new DataSourcesResource(null, 
segmentsMetadataManager, null, null, null);
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(null, 
segmentsMetadataManager, null, null, null, null);
 
     Response response = 
dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), 
segment.getId().toString());
     Assert.assertEquals(200, response.getStatus());
@@ -734,7 +738,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(segmentsMetadataManager, inventoryView, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -757,7 +761,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(segmentsMetadataManager, inventoryView, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -780,7 +784,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(segmentsMetadataManager, inventoryView, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -803,7 +807,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(segmentsMetadataManager, inventoryView, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -821,7 +825,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(segmentsMetadataManager, inventoryView, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -835,7 +839,7 @@ public class DataSourcesResourceTest
   public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadNoArguments()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -848,7 +852,7 @@ public class DataSourcesResourceTest
   public void 
testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadBothArguments()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -861,7 +865,7 @@ public class DataSourcesResourceTest
   public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadEmptyArray()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -874,7 +878,7 @@ public class DataSourcesResourceTest
   public void testMarkAsUsedNonOvershadowedSegmentsNoPayload()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     Response response = 
dataSourcesResource.markAsUsedNonOvershadowedSegments("datasource1", null);
     Assert.assertEquals(400, response.getStatus());
@@ -1026,7 +1030,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(null, 
segmentIds);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
     Response response = 
dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), 
response.getEntity());
@@ -1049,7 +1053,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(null, 
segmentIds);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
     Response response = 
dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), 
response.getEntity());
@@ -1074,7 +1078,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(null, 
segmentIds);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
     Response response = 
dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(500, response.getStatus());
     Assert.assertNotNull(response.getEntity());
@@ -1096,7 +1100,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, 
null);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
     Response response = 
dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), 
response.getEntity());
@@ -1119,7 +1123,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, 
null);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
     Response response = 
dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), 
response.getEntity());
@@ -1143,7 +1147,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, 
null);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
     Response response = 
dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(500, response.getStatus());
     Assert.assertNotNull(response.getEntity());
@@ -1154,7 +1158,7 @@ public class DataSourcesResourceTest
   public void testMarkSegmentsAsUnusedNullPayload()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     Response response = 
dataSourcesResource.markSegmentsAsUnused("datasource1", null);
     Assert.assertEquals(400, response.getStatus());
@@ -1169,7 +1173,7 @@ public class DataSourcesResourceTest
   public void testMarkSegmentsAsUnusedInvalidPayload()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
         new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null);
@@ -1183,7 +1187,7 @@ public class DataSourcesResourceTest
   public void testMarkSegmentsAsUnusedInvalidPayloadBothArguments()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, 
null, null, null);
 
     final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
         new 
DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-01/P1D"),
 ImmutableSet.of());
@@ -1193,6 +1197,251 @@ public class DataSourcesResourceTest
     Assert.assertNotNull(response.getEntity());
   }
 
+  @Test
+  public void testGetDatasourceLoadstatusForceMetadataRefreshNull()
+  {
+    DataSourcesResource dataSourcesResource = new 
DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, 
null);
+    Response response = 
dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, 
null);
+    Assert.assertEquals(400, response.getStatus());
+  }
+
+  @Test
+  public void testGetDatasourceLoadstatusNoSegmentForInterval()
+  {
+    List<DataSegment> segments = ImmutableList.of();
+    // Test when datasource fully loaded
+    
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq(
+        "datasource1"), EasyMock.anyObject(Interval.class), 
EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    EasyMock.replay(segmentsMetadataManager);
+
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        segmentsMetadataManager,
+        null,
+        null,
+        null,
+        null
+    );
+    Response response = 
dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, 
null);
+    Assert.assertEquals(204, response.getStatus());
+  }
+
+  @Test
+  public void testGetDatasourceLoadstatusDefault()
+  {
+    DataSegment datasource1Segment1 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-01/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        10
+    );
+
+    DataSegment datasource1Segment2 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-22/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        20
+    );
+    DataSegment datasource2Segment1 = new DataSegment(
+        "datasource2",
+        Intervals.of("2010-01-01/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        30
+    );
+    List<DataSegment> segments = ImmutableList.of(datasource1Segment1, 
datasource1Segment2);
+    Map<SegmentId, SegmentLoadInfo> completedLoadInfoMap = ImmutableMap.of(
+        datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1),
+        datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2),
+        datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1)
+    );
+    Map<SegmentId, SegmentLoadInfo> halfLoadedInfoMap = ImmutableMap.of(
+        datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1)
+    );
+
+    // Test when datasource fully loaded
+    
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"),
 EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    
EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once();
+    EasyMock.replay(segmentsMetadataManager, inventoryView);
+
+    DataSourcesResource dataSourcesResource = new 
DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, 
null);
+    Response response = 
dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, 
null);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    Assert.assertEquals(1, ((Map) response.getEntity()).size());
+    Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
+    Assert.assertEquals(100.0, ((Map) 
response.getEntity()).get("datasource1"));
+    EasyMock.verify(segmentsMetadataManager, inventoryView);
+    EasyMock.reset(segmentsMetadataManager, inventoryView);
+
+    // Test when datasource half loaded
+    
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"),
 EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    
EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once();
+    EasyMock.replay(segmentsMetadataManager, inventoryView);
+
+    dataSourcesResource = new DataSourcesResource(inventoryView, 
segmentsMetadataManager, null, null, null, null);
+    response = dataSourcesResource.getDatasourceLoadstatus("datasource1", 
true, null, null, null);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    Assert.assertEquals(1, ((Map) response.getEntity()).size());
+    Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
+    Assert.assertEquals(50.0, ((Map) response.getEntity()).get("datasource1"));
+    EasyMock.verify(segmentsMetadataManager, inventoryView);
+  }
+
+  @Test
+  public void testGetDatasourceLoadstatusSimple()
+  {
+    DataSegment datasource1Segment1 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-01/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        10
+    );
+
+    DataSegment datasource1Segment2 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-22/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        20
+    );
+    DataSegment datasource2Segment1 = new DataSegment(
+        "datasource2",
+        Intervals.of("2010-01-01/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        30
+    );
+    List<DataSegment> segments = ImmutableList.of(datasource1Segment1, 
datasource1Segment2);
+    Map<SegmentId, SegmentLoadInfo> completedLoadInfoMap = ImmutableMap.of(
+        datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1),
+        datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2),
+        datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1)
+    );
+    Map<SegmentId, SegmentLoadInfo> halfLoadedInfoMap = ImmutableMap.of(
+        datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1)
+    );
+
+    // Test when datasource fully loaded
+    
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"),
 EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    
EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once();
+    EasyMock.replay(segmentsMetadataManager, inventoryView);
+
+    DataSourcesResource dataSourcesResource = new 
DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, 
null);
+    Response response = 
dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, 
"simple", null);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    Assert.assertEquals(1, ((Map) response.getEntity()).size());
+    Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
+    Assert.assertEquals(0, ((Map) response.getEntity()).get("datasource1"));
+    EasyMock.verify(segmentsMetadataManager, inventoryView);
+    EasyMock.reset(segmentsMetadataManager, inventoryView);
+
+    // Test when datasource half loaded
+    
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"),
 EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    
EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once();
+    EasyMock.replay(segmentsMetadataManager, inventoryView);
+
+    dataSourcesResource = new DataSourcesResource(inventoryView, 
segmentsMetadataManager, null, null, null, null);
+    response = dataSourcesResource.getDatasourceLoadstatus("datasource1", 
true, null, "simple", null);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    Assert.assertEquals(1, ((Map) response.getEntity()).size());
+    Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
+    Assert.assertEquals(1, ((Map) response.getEntity()).get("datasource1"));
+    EasyMock.verify(segmentsMetadataManager, inventoryView);
+  }
+
+  @Test
+  public void testGetDatasourceLoadstatusFull()
+  {
+    DataSegment datasource1Segment1 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-01/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        10
+    );
+
+    DataSegment datasource1Segment2 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-22/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        20
+    );
+    List<DataSegment> segments = ImmutableList.of(datasource1Segment1, 
datasource1Segment2);
+
+    final Map<String, Object2LongMap<String>> 
underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+    Object2LongMap<String> tier1 = new Object2LongOpenHashMap<>();
+    tier1.put("datasource1", 0L);
+    Object2LongMap<String> tier2 = new Object2LongOpenHashMap<>();
+    tier2.put("datasource1", 3L);
+    underReplicationCountsPerDataSourcePerTier.put("tier1", tier1);
+    underReplicationCountsPerDataSourcePerTier.put("tier2", tier2);
+
+    // Test when datasource fully loaded
+    
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"),
 EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    DruidCoordinator druidCoordinator = 
EasyMock.createMock(DruidCoordinator.class);
+    
EasyMock.expect(druidCoordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments))
+            .andReturn(underReplicationCountsPerDataSourcePerTier).once();
+
+    EasyMock.replay(segmentsMetadataManager, druidCoordinator);
+
+    DataSourcesResource dataSourcesResource = new 
DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, 
druidCoordinator);
+    Response response = 
dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, 
"full");
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    Assert.assertEquals(2, ((Map) response.getEntity()).size());
+    Assert.assertEquals(1, ((Map) ((Map) 
response.getEntity()).get("tier1")).size());
+    Assert.assertEquals(1, ((Map) ((Map) 
response.getEntity()).get("tier2")).size());
+    Assert.assertEquals(0L, ((Map) ((Map) 
response.getEntity()).get("tier1")).get("datasource1"));
+    Assert.assertEquals(3L, ((Map) ((Map) 
response.getEntity()).get("tier2")).get("datasource1"));
+    EasyMock.verify(segmentsMetadataManager);
+  }
+
   private DruidServerMetadata createRealtimeServerMetadata(String name)
   {
     return createServerMetadata(name, ServerType.REALTIME);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to