This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new de9da373846 topn with granularity regression fixes (#17565)
de9da373846 is described below

commit de9da373846c1796c9e6fa624956043896129bf1
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Dec 17 07:51:24 2024 -0800

    topn with granularity regression fixes (#17565)
    
    * topn with granularity regression fixes
    
    changes:
    * fix issue where topN with query granularity other than ALL would use the 
heap algorithm when it was actual able to use the pooled algorithm, and 
incorrectly used the pool algorithm in cases where it must use the heap 
algorithm, a regression from #16533
    * fix issue where topN with query granularity other than ALL could 
incorrectly process values in the wrong time bucket, another regression from 
#16533
    
    * move defensive check outside of loop
    
    * more test
    
    * extra layer of safety
    
    * move check outside of loop
    
    * fix spelling
    
    * add query context parameter to allow using pooled algorithm for topN when 
multi-passes is required even wihen query granularity is not all
    
    * add comment, revert IT context changes and add new context flag
---
 .../resources/queries/twitterstream_queries.json   |  12 +-
 .../org/apache/druid/query/CursorGranularizer.java |  16 +-
 .../java/org/apache/druid/query/QueryContexts.java |   6 +
 .../groupby/epinephelinae/GroupByQueryEngine.java  |   2 +-
 .../topn/AggregateTopNMetricFirstAlgorithm.java    |   1 +
 .../apache/druid/query/topn/BaseTopNAlgorithm.java |   7 +-
 .../Generic1AggPooledTopNScannerPrototype.java     |  38 +-
 .../Generic2AggPooledTopNScannerPrototype.java     |  46 +-
 .../druid/query/topn/PooledTopNAlgorithm.java      | 242 +++---
 .../query/topn/TimeExtractionTopNAlgorithm.java    |  26 +-
 .../apache/druid/query/topn/TopNQueryEngine.java   |  29 +-
 ...llableNumericTopNColumnAggregatesProcessor.java |  32 +-
 .../types/StringTopNColumnAggregatesProcessor.java |  72 +-
 .../druid/query/topn/TopNQueryRunnerTest.java      | 923 +++++++++++++++++++++
 14 files changed, 1213 insertions(+), 239 deletions(-)

diff --git 
a/integration-tests/src/test/resources/queries/twitterstream_queries.json 
b/integration-tests/src/test/resources/queries/twitterstream_queries.json
index cdd4057eb57..104b07ba47a 100644
--- a/integration-tests/src/test/resources/queries/twitterstream_queries.json
+++ b/integration-tests/src/test/resources/queries/twitterstream_queries.json
@@ -94,7 +94,8 @@
             "context": {
                 "useCache": "true",
                 "populateCache": "true",
-                "timeout": 60000
+                "timeout": 60000,
+                "useTopNMultiPassPooledQueryGranularity": "true"
             }
         },
         "expectedResults": [
@@ -198,7 +199,8 @@
             "context": {
                 "useCache": "true",
                 "populateCache": "true",
-                "timeout": 60000
+                "timeout": 60000,
+                "useTopNMultiPassPooledQueryGranularity": "true"
             }
         },
         "expectedResults": [
@@ -322,7 +324,8 @@
             "context": {
                 "useCache": "true",
                 "populateCache": "true",
-                "timeout": 60000
+                "timeout": 60000,
+                "useTopNMultiPassPooledQueryGranularity": "true"
             }
         },
         "expectedResults": [
@@ -741,7 +744,8 @@
             "context": {
                 "useCache": "true",
                 "populateCache": "true",
-                "timeout": 60000
+                "timeout": 60000,
+                "useTopNMultiPassPooledQueryGranularity": "true"
             }
         },
         "expectedResults": [
diff --git 
a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java 
b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java
index 6bc387183ba..ffdb5bd15fa 100644
--- a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java
+++ b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.segment.ColumnValueSelector;
@@ -94,11 +95,13 @@ public class CursorGranularizer
       timeSelector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
     }
 
-    return new CursorGranularizer(cursor, bucketIterable, timeSelector, 
timeOrder == Order.DESCENDING);
+    return new CursorGranularizer(cursor, granularity, bucketIterable, 
timeSelector, timeOrder == Order.DESCENDING);
   }
 
   private final Cursor cursor;
 
+  private final Granularity granularity;
+
   // Iterable that iterates over time buckets.
   private final Iterable<Interval> bucketIterable;
 
@@ -112,12 +115,14 @@ public class CursorGranularizer
 
   private CursorGranularizer(
       Cursor cursor,
+      Granularity granularity,
       Iterable<Interval> bucketIterable,
       @Nullable ColumnValueSelector timeSelector,
       boolean descending
   )
   {
     this.cursor = cursor;
+    this.granularity = granularity;
     this.bucketIterable = bucketIterable;
     this.timeSelector = timeSelector;
     this.descending = descending;
@@ -133,13 +138,18 @@ public class CursorGranularizer
     return DateTimes.utc(currentBucketStart);
   }
 
+  public Interval getCurrentInterval()
+  {
+    return Intervals.utc(currentBucketStart, currentBucketEnd);
+  }
+
   public boolean advanceToBucket(final Interval bucketInterval)
   {
+    currentBucketStart = bucketInterval.getStartMillis();
+    currentBucketEnd = bucketInterval.getEndMillis();
     if (cursor.isDone()) {
       return false;
     }
-    currentBucketStart = bucketInterval.getStartMillis();
-    currentBucketEnd = bucketInterval.getEndMillis();
     if (timeSelector == null) {
       return true;
     }
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java 
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 74b45023d04..a2a81c6eb74 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -89,6 +89,12 @@ public class QueryContexts
   public static final String UNCOVERED_INTERVALS_LIMIT_KEY = 
"uncoveredIntervalsLimit";
   public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold";
   public static final String CATALOG_VALIDATION_ENABLED = 
"catalogValidationEnabled";
+  // this flag controls whether the topN engine can use the 'pooled' algorithm 
when query granularity is set to
+  // anything other than 'ALL' and the cardinality + number of aggregators 
would require more size than is available
+  // in the buffers and so must reset the cursor to use multiple passes. This 
is likely slower than the default
+  // behavior of falling back to heap memory, but less dangerous since too 
large of a query can cause the heap to run
+  // out of memory
+  public static final String TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY = 
"useTopNMultiPassPooledQueryGranularity";
   /**
    * Context parameter to enable/disable the extended filtered sum rewrite 
logic.
    *
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java
index 8a1f142cd14..35d958f7ad1 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java
@@ -391,7 +391,7 @@ public class GroupByQueryEngine
       if (delegate != null && delegate.hasNext()) {
         return true;
       } else {
-        if (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
+        if (granularizer.currentOffsetWithinBucket()) {
           if (delegate != null) {
             delegate.close();
           }
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java
 
b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java
index a4605c3f265..b51b79da049 100644
--- 
a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java
+++ 
b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java
@@ -113,6 +113,7 @@ public class AggregateTopNMetricFirstAlgorithm implements 
TopNAlgorithm<int[], T
     try {
       // reset cursor since we call run again
       params.getCursor().reset();
+      
params.getGranularizer().advanceToBucket(params.getGranularizer().getCurrentInterval());
       // Run topN for all metrics for top N dimension values
       allMetricsParam = allMetricAlgo.makeInitParams(params.getSelectorPlus(), 
params.getCursor(), params.getGranularizer());
       allMetricAlgo.run(
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java 
b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
index 4c0bb066eec..b375db72d84 100644
--- 
a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
+++ 
b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
@@ -97,12 +97,14 @@ public abstract class BaseTopNAlgorithm<DimValSelector, 
DimValAggregateStore, Pa
     }
     boolean hasDimValSelector = (dimValSelector != null);
 
-    int cardinality = params.getCardinality();
+    final int cardinality = params.getCardinality();
+    final int numValuesPerPass = params.getNumValuesPerPass();
     int numProcessed = 0;
     long processedRows = 0;
     while (numProcessed < cardinality) {
       final int numToProcess;
-      int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality 
- numProcessed);
+      int maxNumToProcess = Math.min(numValuesPerPass, cardinality - 
numProcessed);
+
 
       DimValSelector theDimValSelector;
       if (!hasDimValSelector) {
@@ -125,6 +127,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, 
DimValAggregateStore, Pa
       numProcessed += numToProcess;
       if (numProcessed < cardinality) {
         params.getCursor().reset();
+        
params.getGranularizer().advanceToBucket(params.getGranularizer().getCurrentInterval());
       }
     }
     if (queryMetrics != null) {
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java
 
b/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java
index f4c2ba1863b..2de0a3db65f 100644
--- 
a/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java
+++ 
b/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java
@@ -54,25 +54,27 @@ public final class Generic1AggPooledTopNScannerPrototype 
implements Generic1AggP
   {
     long processedRows = 0;
     int positionToAllocate = 0;
-    while (!cursor.isDoneOrInterrupted()) {
-      final IndexedInts dimValues = dimensionSelector.getRow();
-      final int dimSize = dimValues.size();
-      for (int i = 0; i < dimSize; i++) {
-        int dimIndex = dimValues.get(i);
-        int position = positions[dimIndex];
-        if (position >= 0) {
-          aggregator.aggregate(resultsBuffer, position);
-        } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
-          positions[dimIndex] = positionToAllocate;
-          position = positionToAllocate;
-          aggregator.init(resultsBuffer, position);
-          aggregator.aggregate(resultsBuffer, position);
-          positionToAllocate += aggregatorSize;
+    if (granularizer.currentOffsetWithinBucket()) {
+      while (!cursor.isDoneOrInterrupted()) {
+        final IndexedInts dimValues = dimensionSelector.getRow();
+        final int dimSize = dimValues.size();
+        for (int i = 0; i < dimSize; i++) {
+          int dimIndex = dimValues.get(i);
+          int position = positions[dimIndex];
+          if (position >= 0) {
+            aggregator.aggregate(resultsBuffer, position);
+          } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
+            positions[dimIndex] = positionToAllocate;
+            position = positionToAllocate;
+            aggregator.init(resultsBuffer, position);
+            aggregator.aggregate(resultsBuffer, position);
+            positionToAllocate += aggregatorSize;
+          }
+        }
+        processedRows++;
+        if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
+          break;
         }
-      }
-      processedRows++;
-      if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
-        break;
       }
     }
     return processedRows;
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java
 
b/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java
index 4de281d8a0b..1e221992b02 100644
--- 
a/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java
+++ 
b/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java
@@ -57,29 +57,31 @@ public final class Generic2AggPooledTopNScannerPrototype 
implements Generic2AggP
     int totalAggregatorsSize = aggregator1Size + aggregator2Size;
     long processedRows = 0;
     int positionToAllocate = 0;
-    while (!cursor.isDoneOrInterrupted()) {
-      final IndexedInts dimValues = dimensionSelector.getRow();
-      final int dimSize = dimValues.size();
-      for (int i = 0; i < dimSize; i++) {
-        int dimIndex = dimValues.get(i);
-        int position = positions[dimIndex];
-        if (position >= 0) {
-          aggregator1.aggregate(resultsBuffer, position);
-          aggregator2.aggregate(resultsBuffer, position + aggregator1Size);
-        } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
-          positions[dimIndex] = positionToAllocate;
-          position = positionToAllocate;
-          aggregator1.init(resultsBuffer, position);
-          aggregator1.aggregate(resultsBuffer, position);
-          position += aggregator1Size;
-          aggregator2.init(resultsBuffer, position);
-          aggregator2.aggregate(resultsBuffer, position);
-          positionToAllocate += totalAggregatorsSize;
+    if (granularizer.currentOffsetWithinBucket()) {
+      while (!cursor.isDoneOrInterrupted()) {
+        final IndexedInts dimValues = dimensionSelector.getRow();
+        final int dimSize = dimValues.size();
+        for (int i = 0; i < dimSize; i++) {
+          int dimIndex = dimValues.get(i);
+          int position = positions[dimIndex];
+          if (position >= 0) {
+            aggregator1.aggregate(resultsBuffer, position);
+            aggregator2.aggregate(resultsBuffer, position + aggregator1Size);
+          } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
+            positions[dimIndex] = positionToAllocate;
+            position = positionToAllocate;
+            aggregator1.init(resultsBuffer, position);
+            aggregator1.aggregate(resultsBuffer, position);
+            position += aggregator1Size;
+            aggregator2.init(resultsBuffer, position);
+            aggregator2.aggregate(resultsBuffer, position);
+            positionToAllocate += totalAggregatorsSize;
+          }
+        }
+        processedRows++;
+        if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
+          break;
         }
-      }
-      processedRows++;
-      if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
-        break;
       }
     }
     return processedRows;
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java 
b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
index 9dfc9ee3ded..f31b6c1e656 100644
--- 
a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
+++ 
b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
@@ -477,13 +477,105 @@ public class PooledTopNAlgorithm
     final int aggExtra = aggSize % AGG_UNROLL_COUNT;
     int currentPosition = 0;
     long processedRows = 0;
-    while (!cursor.isDoneOrInterrupted()) {
-      final IndexedInts dimValues = dimSelector.getRow();
-
-      final int dimSize = dimValues.size();
-      final int dimExtra = dimSize % AGG_UNROLL_COUNT;
-      switch (dimExtra) {
-        case 7:
+    if (granularizer.currentOffsetWithinBucket()) {
+      while (!cursor.isDoneOrInterrupted()) {
+        final IndexedInts dimValues = dimSelector.getRow();
+
+        final int dimSize = dimValues.size();
+        final int dimExtra = dimSize % AGG_UNROLL_COUNT;
+        switch (dimExtra) {
+          case 7:
+            currentPosition = aggregateDimValue(
+                positions,
+                theAggregators,
+                resultsBuf,
+                numBytesPerRecord,
+                aggregatorOffsets,
+                aggSize,
+                aggExtra,
+                dimValues.get(6),
+                currentPosition
+            );
+            // fall through
+          case 6:
+            currentPosition = aggregateDimValue(
+                positions,
+                theAggregators,
+                resultsBuf,
+                numBytesPerRecord,
+                aggregatorOffsets,
+                aggSize,
+                aggExtra,
+                dimValues.get(5),
+                currentPosition
+            );
+            // fall through
+          case 5:
+            currentPosition = aggregateDimValue(
+                positions,
+                theAggregators,
+                resultsBuf,
+                numBytesPerRecord,
+                aggregatorOffsets,
+                aggSize,
+                aggExtra,
+                dimValues.get(4),
+                currentPosition
+            );
+            // fall through
+          case 4:
+            currentPosition = aggregateDimValue(
+                positions,
+                theAggregators,
+                resultsBuf,
+                numBytesPerRecord,
+                aggregatorOffsets,
+                aggSize,
+                aggExtra,
+                dimValues.get(3),
+                currentPosition
+            );
+            // fall through
+          case 3:
+            currentPosition = aggregateDimValue(
+                positions,
+                theAggregators,
+                resultsBuf,
+                numBytesPerRecord,
+                aggregatorOffsets,
+                aggSize,
+                aggExtra,
+                dimValues.get(2),
+                currentPosition
+            );
+            // fall through
+          case 2:
+            currentPosition = aggregateDimValue(
+                positions,
+                theAggregators,
+                resultsBuf,
+                numBytesPerRecord,
+                aggregatorOffsets,
+                aggSize,
+                aggExtra,
+                dimValues.get(1),
+                currentPosition
+            );
+            // fall through
+          case 1:
+            currentPosition = aggregateDimValue(
+                positions,
+                theAggregators,
+                resultsBuf,
+                numBytesPerRecord,
+                aggregatorOffsets,
+                aggSize,
+                aggExtra,
+                dimValues.get(0),
+                currentPosition
+            );
+        }
+        for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) {
           currentPosition = aggregateDimValue(
               positions,
               theAggregators,
@@ -492,11 +584,9 @@ public class PooledTopNAlgorithm
               aggregatorOffsets,
               aggSize,
               aggExtra,
-              dimValues.get(6),
+              dimValues.get(i),
               currentPosition
           );
-          // fall through
-        case 6:
           currentPosition = aggregateDimValue(
               positions,
               theAggregators,
@@ -505,11 +595,9 @@ public class PooledTopNAlgorithm
               aggregatorOffsets,
               aggSize,
               aggExtra,
-              dimValues.get(5),
+              dimValues.get(i + 1),
               currentPosition
           );
-          // fall through
-        case 5:
           currentPosition = aggregateDimValue(
               positions,
               theAggregators,
@@ -518,11 +606,9 @@ public class PooledTopNAlgorithm
               aggregatorOffsets,
               aggSize,
               aggExtra,
-              dimValues.get(4),
+              dimValues.get(i + 2),
               currentPosition
           );
-          // fall through
-        case 4:
           currentPosition = aggregateDimValue(
               positions,
               theAggregators,
@@ -531,11 +617,9 @@ public class PooledTopNAlgorithm
               aggregatorOffsets,
               aggSize,
               aggExtra,
-              dimValues.get(3),
+              dimValues.get(i + 3),
               currentPosition
           );
-          // fall through
-        case 3:
           currentPosition = aggregateDimValue(
               positions,
               theAggregators,
@@ -544,11 +628,9 @@ public class PooledTopNAlgorithm
               aggregatorOffsets,
               aggSize,
               aggExtra,
-              dimValues.get(2),
+              dimValues.get(i + 4),
               currentPosition
           );
-          // fall through
-        case 2:
           currentPosition = aggregateDimValue(
               positions,
               theAggregators,
@@ -557,11 +639,9 @@ public class PooledTopNAlgorithm
               aggregatorOffsets,
               aggSize,
               aggExtra,
-              dimValues.get(1),
+              dimValues.get(i + 5),
               currentPosition
           );
-          // fall through
-        case 1:
           currentPosition = aggregateDimValue(
               positions,
               theAggregators,
@@ -570,103 +650,25 @@ public class PooledTopNAlgorithm
               aggregatorOffsets,
               aggSize,
               aggExtra,
-              dimValues.get(0),
+              dimValues.get(i + 6),
               currentPosition
           );
-      }
-      for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) {
-        currentPosition = aggregateDimValue(
-            positions,
-            theAggregators,
-            resultsBuf,
-            numBytesPerRecord,
-            aggregatorOffsets,
-            aggSize,
-            aggExtra,
-            dimValues.get(i),
-            currentPosition
-        );
-        currentPosition = aggregateDimValue(
-            positions,
-            theAggregators,
-            resultsBuf,
-            numBytesPerRecord,
-            aggregatorOffsets,
-            aggSize,
-            aggExtra,
-            dimValues.get(i + 1),
-            currentPosition
-        );
-        currentPosition = aggregateDimValue(
-            positions,
-            theAggregators,
-            resultsBuf,
-            numBytesPerRecord,
-            aggregatorOffsets,
-            aggSize,
-            aggExtra,
-            dimValues.get(i + 2),
-            currentPosition
-        );
-        currentPosition = aggregateDimValue(
-            positions,
-            theAggregators,
-            resultsBuf,
-            numBytesPerRecord,
-            aggregatorOffsets,
-            aggSize,
-            aggExtra,
-            dimValues.get(i + 3),
-            currentPosition
-        );
-        currentPosition = aggregateDimValue(
-            positions,
-            theAggregators,
-            resultsBuf,
-            numBytesPerRecord,
-            aggregatorOffsets,
-            aggSize,
-            aggExtra,
-            dimValues.get(i + 4),
-            currentPosition
-        );
-        currentPosition = aggregateDimValue(
-            positions,
-            theAggregators,
-            resultsBuf,
-            numBytesPerRecord,
-            aggregatorOffsets,
-            aggSize,
-            aggExtra,
-            dimValues.get(i + 5),
-            currentPosition
-        );
-        currentPosition = aggregateDimValue(
-            positions,
-            theAggregators,
-            resultsBuf,
-            numBytesPerRecord,
-            aggregatorOffsets,
-            aggSize,
-            aggExtra,
-            dimValues.get(i + 6),
-            currentPosition
-        );
-        currentPosition = aggregateDimValue(
-            positions,
-            theAggregators,
-            resultsBuf,
-            numBytesPerRecord,
-            aggregatorOffsets,
-            aggSize,
-            aggExtra,
-            dimValues.get(i + 7),
-            currentPosition
-        );
-      }
-      processedRows++;
-      if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
-        break;
+          currentPosition = aggregateDimValue(
+              positions,
+              theAggregators,
+              resultsBuf,
+              numBytesPerRecord,
+              aggregatorOffsets,
+              aggSize,
+              aggExtra,
+              dimValues.get(i + 7),
+              currentPosition
+          );
+        }
+        processedRows++;
+        if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
+          break;
+        }
       }
     }
     return processedRows;
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
 
b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
index 0c79e7c8d31..b6079a29f29 100644
--- 
a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
+++ 
b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
@@ -93,20 +93,22 @@ public class TimeExtractionTopNAlgorithm extends 
BaseTopNAlgorithm<int[], Map<Ob
     final DimensionSelector dimSelector = params.getDimSelector();
 
     long processedRows = 0;
-    while (!cursor.isDone()) {
-      final Object key = 
dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
+    if (granularizer.currentOffsetWithinBucket()) {
+      while (!cursor.isDone()) {
+        final Object key = 
dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
 
-      Aggregator[] theAggregators = aggregatesStore.computeIfAbsent(
-          key,
-          k -> makeAggregators(cursor, query.getAggregatorSpecs())
-      );
+        Aggregator[] theAggregators = aggregatesStore.computeIfAbsent(
+            key,
+            k -> makeAggregators(cursor, query.getAggregatorSpecs())
+        );
 
-      for (Aggregator aggregator : theAggregators) {
-        aggregator.aggregate();
-      }
-      processedRows++;
-      if (!granularizer.advanceCursorWithinBucket()) {
-        break;
+        for (Aggregator aggregator : theAggregators) {
+          aggregator.aggregate();
+        }
+        processedRows++;
+        if (!granularizer.advanceCursorWithinBucket()) {
+          break;
+        }
       }
     }
     return processedRows;
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java 
b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
index 1382c9aaa4b..c8aa3292158 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.ColumnSelectorPlus;
 import org.apache.druid.query.CursorGranularizer;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryMetrics;
 import org.apache.druid.query.Result;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -245,6 +246,11 @@ public class TopNQueryEngine
       final int numBytesPerRecord
   )
   {
+    if (cardinality < 0) {
+      // unknown cardinality doesn't work with the pooled algorithm which 
requires an exact count of dictionary ids
+      return false;
+    }
+
     if (selector.isHasExtractionFn()) {
       // extraction functions can have a many to one mapping, and should use a 
heap algorithm
       return false;
@@ -254,28 +260,35 @@ public class TopNQueryEngine
       // non-string output cannot use the pooled algorith, even if the 
underlying selector supports it
       return false;
     }
+
     if (!Types.is(capabilities, ValueType.STRING)) {
       // non-strings are not eligible to use the pooled algorithm, and should 
use a heap algorithm
       return false;
     }
 
-    // string columns must use the on heap algorithm unless they have the 
following capabilites
     if (!capabilities.isDictionaryEncoded().isTrue() || 
!capabilities.areDictionaryValuesUnique().isTrue()) {
+      // string columns must use the on heap algorithm unless they have the 
following capabilites
       return false;
     }
-    if (Granularities.ALL.equals(query.getGranularity())) {
-      // all other requirements have been satisfied, ALL granularity can 
always use the pooled algorithms
-      return true;
-    }
-    // if not using ALL granularity, we can still potentially use the pooled 
algorithm if we are certain it doesn't
-    // need to make multiple passes (e.g. reset the cursor)
+
+    // num values per pass must be greater than 0 or else the pooled algorithm 
cannot progress
     try (final ResourceHolder<ByteBuffer> resultsBufHolder = 
bufferPool.take()) {
       final ByteBuffer resultsBuf = resultsBufHolder.get();
 
       final int numBytesToWorkWith = resultsBuf.capacity();
       final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith 
/ numBytesPerRecord : cardinality;
 
-      return numValuesPerPass <= cardinality;
+      final boolean allowMultiPassPooled = query.context().getBoolean(
+          QueryContexts.TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY,
+          false
+      );
+      if (Granularities.ALL.equals(query.getGranularity()) || 
allowMultiPassPooled) {
+        return numValuesPerPass > 0;
+      }
+
+      // if not using multi-pass for pooled + query granularity other than 
'ALL', we must check that all values can fit
+      // in a single pass
+      return numValuesPerPass >= cardinality;
     }
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java
 
b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java
index 565ad036cea..41709e35497 100644
--- 
a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java
+++ 
b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java
@@ -94,23 +94,25 @@ public abstract class 
NullableNumericTopNColumnAggregatesProcessor<Selector exte
   )
   {
     long processedRows = 0;
-    while (!cursor.isDone()) {
-      if (hasNulls && selector.isNull()) {
-        if (nullValueAggregates == null) {
-          nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, 
query.getAggregatorSpecs());
+    if (granularizer.currentOffsetWithinBucket()) {
+      while (!cursor.isDone()) {
+        if (hasNulls && selector.isNull()) {
+          if (nullValueAggregates == null) {
+            nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, 
query.getAggregatorSpecs());
+          }
+          for (Aggregator aggregator : nullValueAggregates) {
+            aggregator.aggregate();
+          }
+        } else {
+          Aggregator[] valueAggregates = getValueAggregators(query, selector, 
cursor);
+          for (Aggregator aggregator : valueAggregates) {
+            aggregator.aggregate();
+          }
         }
-        for (Aggregator aggregator : nullValueAggregates) {
-          aggregator.aggregate();
+        processedRows++;
+        if (!granularizer.advanceCursorWithinBucket()) {
+          break;
         }
-      } else {
-        Aggregator[] valueAggregates = getValueAggregators(query, selector, 
cursor);
-        for (Aggregator aggregator : valueAggregates) {
-          aggregator.aggregate();
-        }
-      }
-      processedRows++;
-      if (!granularizer.advanceCursorWithinBucket()) {
-        break;
       }
     }
     return processedRows;
diff --git 
a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
 
b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
index 9eca369fdc7..ac83de84760 100644
--- 
a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
+++ 
b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java
@@ -150,28 +150,30 @@ public class StringTopNColumnAggregatesProcessor 
implements TopNColumnAggregates
   )
   {
     long processedRows = 0;
-    while (!cursor.isDone()) {
-      final IndexedInts dimValues = selector.getRow();
-      for (int i = 0, size = dimValues.size(); i < size; ++i) {
-        final int dimIndex = dimValues.get(i);
-        Aggregator[] aggs = rowSelector[dimIndex];
-        if (aggs == null) {
-          final Object key = 
dimensionValueConverter.apply(selector.lookupName(dimIndex));
-          aggs = aggregatesStore.computeIfAbsent(
-              key,
-              k -> BaseTopNAlgorithm.makeAggregators(cursor, 
query.getAggregatorSpecs())
-          );
-          rowSelector[dimIndex] = aggs;
+    if (granularizer.currentOffsetWithinBucket()) {
+      while (!cursor.isDone()) {
+        final IndexedInts dimValues = selector.getRow();
+        for (int i = 0, size = dimValues.size(); i < size; ++i) {
+          final int dimIndex = dimValues.get(i);
+          Aggregator[] aggs = rowSelector[dimIndex];
+          if (aggs == null) {
+            final Object key = 
dimensionValueConverter.apply(selector.lookupName(dimIndex));
+            aggs = aggregatesStore.computeIfAbsent(
+                key,
+                k -> BaseTopNAlgorithm.makeAggregators(cursor, 
query.getAggregatorSpecs())
+            );
+            rowSelector[dimIndex] = aggs;
+          }
+
+          for (Aggregator aggregator : aggs) {
+            aggregator.aggregate();
+          }
         }
-
-        for (Aggregator aggregator : aggs) {
-          aggregator.aggregate();
+        processedRows++;
+        if (!granularizer.advanceCursorWithinBucket()) {
+          break;
         }
       }
-      processedRows++;
-      if (!granularizer.advanceCursorWithinBucket()) {
-        break;
-      }
     }
     return processedRows;
   }
@@ -192,22 +194,24 @@ public class StringTopNColumnAggregatesProcessor 
implements TopNColumnAggregates
   )
   {
     long processedRows = 0;
-    while (!cursor.isDone()) {
-      final IndexedInts dimValues = selector.getRow();
-      for (int i = 0, size = dimValues.size(); i < size; ++i) {
-        final int dimIndex = dimValues.get(i);
-        final Object key = 
dimensionValueConverter.apply(selector.lookupName(dimIndex));
-        Aggregator[] aggs = aggregatesStore.computeIfAbsent(
-            key,
-            k -> BaseTopNAlgorithm.makeAggregators(cursor, 
query.getAggregatorSpecs())
-        );
-        for (Aggregator aggregator : aggs) {
-          aggregator.aggregate();
+    if (granularizer.currentOffsetWithinBucket()) {
+      while (!cursor.isDone()) {
+        final IndexedInts dimValues = selector.getRow();
+        for (int i = 0, size = dimValues.size(); i < size; ++i) {
+          final int dimIndex = dimValues.get(i);
+          final Object key = 
dimensionValueConverter.apply(selector.lookupName(dimIndex));
+          Aggregator[] aggs = aggregatesStore.computeIfAbsent(
+              key,
+              k -> BaseTopNAlgorithm.makeAggregators(cursor, 
query.getAggregatorSpecs())
+          );
+          for (Aggregator aggregator : aggs) {
+            aggregator.aggregate();
+          }
+        }
+        processedRows++;
+        if (!granularizer.advanceCursorWithinBucket()) {
+          break;
         }
-      }
-      processedRows++;
-      if (!granularizer.advanceCursorWithinBucket()) {
-        break;
       }
     }
     return processedRows;
diff --git 
a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java 
b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
index 42d8a72a0a0..c3a0982d371 100644
--- 
a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
@@ -60,6 +60,7 @@ import 
org.apache.druid.query.aggregation.FilteredAggregatorFactory;
 import org.apache.druid.query.aggregation.FloatMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
 import 
org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
 import 
org.apache.druid.query.aggregation.firstlast.first.DoubleFirstAggregatorFactory;
 import 
org.apache.druid.query.aggregation.firstlast.first.FloatFirstAggregatorFactory;
@@ -6378,6 +6379,928 @@ public class TopNQueryRunnerTest extends 
InitializedNullHandlingTest
     assertExpectedResults(expectedResults, query);
   }
 
+
+  @Test
+  public void testTopN_time_granularity_empty_buckets()
+  {
+    assumeTimeOrdered();
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .granularity(Granularities.HOUR)
+        .dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
+        .metric(QueryRunnerTestHelper.INDEX_METRIC)
+        .threshold(10_000)
+        .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .aggregators(QueryRunnerTestHelper.INDEX_LONG_SUM)
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Arrays.asList(
+        new Result<>(
+            DateTimes.of("2011-04-01T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
+                        "index", 2836L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
+                        "index", 2681L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
+                        "index", 1102L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(
+            DateTimes.of("2011-04-02T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
+                        "index", 2514L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
+                        "index", 2193L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
+                        "index", 1120L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList()))
+    );
+
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testTopN_time_granularity_empty_buckets_expression()
+  {
+    assumeTimeOrdered();
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .granularity(Granularities.HOUR)
+        .virtualColumns(
+            new ExpressionVirtualColumn(
+                "vc",
+                "market + ' ' + placement",
+                ColumnType.STRING,
+                TestExprMacroTable.INSTANCE
+            )
+        )
+        .dimension("vc")
+        .metric(QueryRunnerTestHelper.INDEX_METRIC)
+        .threshold(10_000)
+        .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .aggregators(QueryRunnerTestHelper.INDEX_LONG_SUM)
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Arrays.asList(
+        new Result<>(
+            DateTimes.of("2011-04-01T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        "vc", "total_market preferred",
+                        "index", 2836L
+                    ),
+                    ImmutableMap.of(
+                        "vc", "upfront preferred",
+                        "index", 2681L
+                    ),
+                    ImmutableMap.of(
+                        "vc", "spot preferred",
+                        "index", 1102L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(
+            DateTimes.of("2011-04-02T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        "vc", "total_market preferred",
+                        "index", 2514L
+                    ),
+                    ImmutableMap.of(
+                        "vc", "upfront preferred",
+                        "index", 2193L
+                    ),
+                    ImmutableMap.of(
+                        "vc", "spot preferred",
+                        "index", 1120L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList()))
+    );
+
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testTopN_time_granularity_empty_buckets_2pool()
+  {
+    assumeTimeOrdered();
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .granularity(Granularities.HOUR)
+        .dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
+        .metric(QueryRunnerTestHelper.INDEX_METRIC)
+        .threshold(10_000)
+        .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .aggregators(
+            QueryRunnerTestHelper.INDEX_LONG_SUM,
+            QueryRunnerTestHelper.INDEX_DOUBLE_MAX
+        )
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Arrays.asList(
+        new Result<>(
+            DateTimes.of("2011-04-01T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
+                        "index", 2836L,
+                        "doubleMaxIndex", 1522.043733
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
+                        "index", 2681L,
+                        "doubleMaxIndex", 1447.34116
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
+                        "index", 1102L,
+                        "doubleMaxIndex", 158.747224
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(
+            DateTimes.of("2011-04-02T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
+                        "index", 2514L,
+                        "doubleMaxIndex", 1321.375057
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
+                        "index", 2193L,
+                        "doubleMaxIndex", 1144.342401
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
+                        "index", 1120L,
+                        "doubleMaxIndex", 166.016049
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList()))
+    );
+
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testTopN_time_granularity_empty_buckets_timeExtract()
+  {
+    // this is pretty wierd to have both query granularity and a time 
extractionFn... but it is not explicitly
+    // forbidden so might as well test it
+    assumeTimeOrdered();
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .granularity(Granularities.HOUR)
+        .dimension(
+            new ExtractionDimensionSpec(
+                ColumnHolder.TIME_COLUMN_NAME,
+                "dayOfWeek",
+                new TimeFormatExtractionFn("EEEE", null, null, null, false)
+            )
+        )
+        .metric(QueryRunnerTestHelper.INDEX_METRIC)
+        .threshold(10_000)
+        .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .aggregators(
+            QueryRunnerTestHelper.INDEX_LONG_SUM
+        )
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Arrays.asList(
+        new Result<>(
+            DateTimes.of("2011-04-01T00:00:00.000Z"),
+            TopNResultValue.create(
+                Collections.singletonList(
+                    ImmutableMap.of(
+                        "dayOfWeek", "Friday",
+                        "index", 6619L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(
+            DateTimes.of("2011-04-02T00:00:00.000Z"),
+            TopNResultValue.create(
+                Collections.singletonList(
+                    ImmutableMap.of(
+                        "dayOfWeek", "Saturday",
+                        "index", 5827L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList()))
+    );
+
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testTopN_time_granularity_empty_buckets_numeric()
+  {
+    assumeTimeOrdered();
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .granularity(Granularities.HOUR)
+        .dimension(new DefaultDimensionSpec("qualityLong", "qualityLong", 
ColumnType.LONG))
+        .metric(QueryRunnerTestHelper.INDEX_METRIC)
+        .threshold(10_000)
+        .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .aggregators(
+            QueryRunnerTestHelper.INDEX_LONG_SUM
+        )
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Arrays.asList(
+        new Result<>(
+            DateTimes.of("2011-04-01T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.asList(
+                    ImmutableMap.of(
+                        "qualityLong", 1600L,
+                        "index", 2900L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1400L,
+                        "index", 2870L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1200L,
+                        "index", 158L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1000L,
+                        "index", 135L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1500L,
+                        "index", 121L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1300L,
+                        "index", 120L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1800L,
+                        "index", 119L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1100L,
+                        "index", 118L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1700L,
+                        "index", 78L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(
+            DateTimes.of("2011-04-02T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.asList(
+                    ImmutableMap.of(
+                        "qualityLong", 1600L,
+                        "index", 2505L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1400L,
+                        "index", 2447L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1200L,
+                        "index", 166L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1000L,
+                        "index", 147L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1800L,
+                        "index", 126L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1500L,
+                        "index", 114L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1300L,
+                        "index", 113L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1100L,
+                        "index", 112L
+                    ),
+                    ImmutableMap.of(
+                        "qualityLong", 1700L,
+                        "index", 97L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList()))
+    );
+
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testTopN_time_granularity_multipass_no_pool()
+  {
+    assumeTimeOrdered();
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .granularity(Granularities.HOUR)
+        .dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
+        .metric(QueryRunnerTestHelper.INDEX_METRIC)
+        .threshold(10_000)
+        .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .aggregators(
+            QueryRunnerTestHelper.INDEX_LONG_SUM,
+            new StringAnyAggregatorFactory("big", 
QueryRunnerTestHelper.PLACEMENT_DIMENSION, 4000000, null)
+        )
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Arrays.asList(
+        new Result<>(
+            DateTimes.of("2011-04-01T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
+                        "big", "preferred",
+                        "index", 2836L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
+                        "big", "preferred",
+                        "index", 2681L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
+                        "big", "preferred",
+                        "index", 1102L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(
+            DateTimes.of("2011-04-02T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
+                        "big", "preferred",
+                        "index", 2514L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
+                        "big", "preferred",
+                        "index", 2193L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
+                        "big", "preferred",
+                        "index", 1120L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList()))
+    );
+
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testTopN_time_granularity_multipass_with_pooled()
+  {
+    assumeTimeOrdered();
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .granularity(Granularities.HOUR)
+        .dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
+        .metric(QueryRunnerTestHelper.INDEX_METRIC)
+        .threshold(10_000)
+        .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        
.context(ImmutableMap.of(QueryContexts.TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY,
 true))
+        .aggregators(
+            QueryRunnerTestHelper.INDEX_LONG_SUM,
+            new StringAnyAggregatorFactory("big", 
QueryRunnerTestHelper.PLACEMENT_DIMENSION, 4000000, null)
+        )
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Arrays.asList(
+        new Result<>(
+            DateTimes.of("2011-04-01T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
+                        "big", "preferred",
+                        "index", 2836L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
+                        "big", "preferred",
+                        "index", 2681L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
+                        "big", "preferred",
+                        "index", 1102L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(
+            DateTimes.of("2011-04-02T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
+                        "big", "preferred",
+                        "index", 2514L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
+                        "big", "preferred",
+                        "index", 2193L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
+                        "big", "preferred",
+                        "index", 1120L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList()))
+    );
+
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testTopN_time_granularity_uses_heap_if_too_big()
+  {
+    assumeTimeOrdered();
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .granularity(Granularities.HOUR)
+        .dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
+        .metric(QueryRunnerTestHelper.INDEX_METRIC)
+        .threshold(10_000)
+        .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .aggregators(
+            QueryRunnerTestHelper.INDEX_LONG_SUM,
+            new StringAnyAggregatorFactory("big", 
QueryRunnerTestHelper.PLACEMENT_DIMENSION, 40000000, null)
+        )
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Arrays.asList(
+        new Result<>(
+            DateTimes.of("2011-04-01T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
+                        "big", "preferred",
+                        "index", 2836L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
+                        "big", "preferred",
+                        "index", 2681L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
+                        "big", "preferred",
+                        "index", 1102L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(
+            DateTimes.of("2011-04-02T00:00:00.000Z"),
+            TopNResultValue.create(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
+                        "big", "preferred",
+                        "index", 2514L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
+                        "big", "preferred",
+                        "index", 2193L
+                    ),
+                    ImmutableMap.of(
+                        QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
+                        "big", "preferred",
+                        "index", 1120L
+                    )
+                )
+            )
+        ),
+        new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList())),
+        new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), 
TopNResultValue.create(Collections.emptyList()))
+    );
+
+    assertExpectedResults(expectedResults, query);
+  }
+
   private void assumeTimeOrdered()
   {
     try (final CursorHolder cursorHolder =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to