This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 85f10ed  Support querying realtime segments using time-ordered scan 
queries and fix broken scan queries without time column (#7454)
85f10ed is described below

commit 85f10ed0d0df0e040a5448ce8b99a2c972537381
Author: Justin Borromeo <jborr...@edu.uwaterloo.ca>
AuthorDate: Fri Apr 12 19:08:34 2019 -0700

    Support querying realtime segments using time-ordered scan queries and fix 
broken scan queries without time column (#7454)
    
    * Update scan query runner factory to accept SpecificSegmentSpec
    
    *  nit
    
    * Sorry travis
    
    * Improve logging and fix doc
    
    * Bug fix
    
    * Friendlier error msgs and tests to cover bug
    
    * Address Gian's comments
    
    * Fix doc
    
    * Added tests for empty and null column list
    
    * Style
    
    * Fix checking wrong order (looking at query param when it should be
    looking at the null-handled order)
    
    * Add test case for null order
    
    * Fix ScanQueryRunnerTest
    
    * Forbidden APIs fixed
---
 docs/content/querying/scan-query.md                |   2 +-
 .../org/apache/druid/query/scan/ScanQuery.java     |  10 +
 .../druid/query/scan/ScanQueryRunnerFactory.java   |  37 +-
 .../apache/druid/query/scan/ScanResultValue.java   |  10 +-
 .../scan/ScanResultValueTimestampComparator.java   |   3 +-
 .../query/spec/MultipleSpecificSegmentSpec.java    |  10 +-
 .../query/scan/ScanQueryRunnerFactoryTest.java     | 394 ++++++++++++---------
 .../druid/query/scan/ScanQueryRunnerTest.java      | 269 +++++++++-----
 .../org/apache/druid/query/scan/ScanQueryTest.java | 272 ++++++++++++++
 .../ScanResultValueTimestampComparatorTest.java    |   8 +-
 10 files changed, 743 insertions(+), 272 deletions(-)

diff --git a/docs/content/querying/scan-query.md 
b/docs/content/querying/scan-query.md
index 7ba5c60..6421047 100644
--- a/docs/content/querying/scan-query.md
+++ b/docs/content/querying/scan-query.md
@@ -61,7 +61,7 @@ The following are the main parameters for Scan queries:
 |columns|A String array of dimensions and metrics to scan. If left empty, all 
dimensions and metrics are returned.|no|
 |batchSize|How many rows buffered before return to client. Default is 
`20480`|no|
 |limit|How many rows to return. If not specified, all rows will be 
returned.|no|
-|order|The ordering of returned rows based on timestamp.  "ascending", 
"descending", and "none" (default) are supported.  Currently, "ascending" and 
"descending" are only supported for queries where the limit is less than 
`druid.query.scan.maxRowsQueuedForOrdering`.  Scan queries that are either 
legacy mode or have a limit greater than 
`druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and 
default to a order of "none".|none|
+|order|The ordering of returned rows based on timestamp.  "ascending", 
"descending", and "none" (default) are supported.  Currently, "ascending" and 
"descending" are only supported for queries where the `__time` column is 
included in the `columns` field and the requirements outlined in the [time 
ordering](#time-ordering) section are met.|none|
 |legacy|Return results consistent with the legacy "scan-query" contrib 
extension. Defaults to the value set by `druid.query.scan.legacy`, which in 
turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no|
 |context|An additional JSON Object which can be used to specify certain flags 
(see the Query Context Properties section below).|no|
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java 
b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
index 2b6fc82..3f6d407 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
@@ -34,6 +34,7 @@ import org.apache.druid.query.Query;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -149,6 +150,12 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
     this.columns = columns;
     this.legacy = legacy;
     this.order = (order == null) ? Order.NONE : order;
+    if (this.order != Order.NONE) {
+      Preconditions.checkArgument(
+          columns == null || columns.size() == 0 || 
columns.contains(ColumnHolder.TIME_COLUMN_NAME),
+          "The __time column must be selected if the results are time-ordered."
+      );
+    }
     this.maxRowsQueuedForOrdering = validateAndGetMaxRowsQueuedForOrdering();
     this.maxSegmentPartitionsOrderedInMemory = 
validateAndGetMaxSegmentPartitionsOrderedInMemory();
   }
@@ -256,6 +263,9 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
   @Override
   public Ordering<ScanResultValue> getResultOrdering()
   {
+    if (order == Order.NONE) {
+      return Ordering.natural();
+    }
     return Ordering.from(new 
ScanResultValueTimestampComparator(this)).reverse();
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
 
b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
index 5f49a66..0a9f3b9 100644
--- 
a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
@@ -40,11 +40,14 @@ import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.query.spec.SpecificSegmentSpec;
 import org.apache.druid.segment.Segment;
 import org.joda.time.Interval;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Deque;
 import java.util.LinkedHashMap;
@@ -111,17 +114,7 @@ public class ScanQueryRunnerFactory implements 
QueryRunnerFactory<ScanResultValu
           return returnedRows;
         }
       } else {
-        // Query segment spec must be an instance of 
MultipleSpecificSegmentSpec because segment descriptors need
-        // to be present for a 1:1 matching of intervals with query runners.  
The other types of segment spec condense
-        // the intervals (i.e. merge neighbouring intervals), eliminating the 
1:1 relationship between intervals
-        // and query runners.
-        if (!(query.getQuerySegmentSpec() instanceof 
MultipleSpecificSegmentSpec)) {
-          throw new UOE("Time-ordering on scan queries is only supported for 
queries with segment specs"
-                        + "of type MultipleSpecificSegmentSpec");
-        }
-        // Ascending time order for both descriptors and query runners by 
default
-        List<SegmentDescriptor> descriptorsOrdered =
-            ((MultipleSpecificSegmentSpec) 
query.getQuerySegmentSpec()).getDescriptors();
+        List<SegmentDescriptor> descriptorsOrdered = 
getSegmentDescriptorsFromSpecificQuerySpec(query.getQuerySegmentSpec());
         List<QueryRunner<ScanResultValue>> queryRunnersOrdered = 
Lists.newArrayList(queryRunners);
 
         if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
@@ -287,6 +280,28 @@ public class ScanQueryRunnerFactory implements 
QueryRunnerFactory<ScanResultValu
   }
 
   @VisibleForTesting
+  List<SegmentDescriptor> 
getSegmentDescriptorsFromSpecificQuerySpec(QuerySegmentSpec spec)
+  {
+    // Query segment spec must be an instance of MultipleSpecificSegmentSpec 
or SpecificSegmentSpec because
+    // segment descriptors need to be present for a 1:1 matching of intervals 
with query runners.
+    // The other types of segment spec condense the intervals (i.e. merge 
neighbouring intervals), eliminating
+    // the 1:1 relationship between intervals and query runners.
+    List<SegmentDescriptor> descriptorsOrdered;
+
+    if (spec instanceof MultipleSpecificSegmentSpec) {
+      // Ascending time order for both descriptors and query runners by default
+      descriptorsOrdered = ((MultipleSpecificSegmentSpec) 
spec).getDescriptors();
+    } else if (spec instanceof SpecificSegmentSpec) {
+      descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) 
spec).getDescriptor());
+    } else {
+      throw new UOE("Time-ordering on scan queries is only supported for 
queries with segment specs"
+                    + "of type MultipleSpecificSegmentSpec or 
SpecificSegmentSpec...a [%s] was received instead.",
+                    spec.getClass().getSimpleName());
+    }
+    return descriptorsOrdered;
+  }
+
+  @VisibleForTesting
   Sequence<ScanResultValue> nWayMergeAndLimit(
       List<List<QueryRunner<ScanResultValue>>> groupedRunners,
       QueryPlus<ScanResultValue> queryPlus,
diff --git 
a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java 
b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java
index 7bfcf02..a05e715 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.scan;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.segment.column.ColumnHolder;
 
@@ -78,9 +79,16 @@ public class ScanResultValue implements 
Comparable<ScanResultValue>
   public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat)
   {
     if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
-      return (Long) ((Map<String, Object>) ((List<Object>) 
this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
+      Long timestamp = (Long) ((Map<String, Object>) ((List<Object>) 
this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
+      if (timestamp == null) {
+        throw new ISE("Unable to compare timestamp for rows without a time 
column");
+      }
+      return timestamp;
     } else if 
(resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
       int timeColumnIndex = 
this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
+      if (timeColumnIndex == -1) {
+        throw new ISE("Unable to compare timestamp for rows without a time 
column");
+      }
       List<Object> firstEvent = (List<Object>) ((List<Object>) 
this.getEvents()).get(0);
       return (Long) firstEvent.get(timeColumnIndex);
     }
diff --git 
a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java
 
b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java
index dcf3bad..69f780f 100644
--- 
a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java
@@ -42,8 +42,7 @@ public class ScanResultValueTimestampComparator implements 
Comparator<ScanResult
   @Override
   public int compare(ScanResultValue o1, ScanResultValue o2)
   {
-    int comparison;
-    comparison = Longs.compare(
+    int comparison = Longs.compare(
         o1.getFirstEventTimestamp(scanQuery.getResultFormat()),
         o2.getFirstEventTimestamp(scanQuery.getResultFormat()));
     if (scanQuery.getOrder().equals(ScanQuery.Order.DESCENDING)) {
diff --git 
a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java
 
b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java
index 6262335..34a458d 100644
--- 
a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java
@@ -21,7 +21,6 @@ package org.apache.druid.query.spec;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.query.Query;
@@ -64,14 +63,7 @@ public class MultipleSpecificSegmentSpec implements 
QuerySegmentSpec
     intervals = JodaUtils.condenseIntervals(
         Iterables.transform(
             descriptors,
-            new Function<SegmentDescriptor, Interval>()
-            {
-              @Override
-              public Interval apply(SegmentDescriptor input)
-              {
-                return input.getInterval();
-              }
-            }
+            input -> input.getInterval()
         )
     );
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java
index a7cf3c6..6559c65 100644
--- 
a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java
@@ -31,9 +31,15 @@ import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerTestHelper;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.spec.LegacySegmentSpec;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.query.spec.SpecificSegmentSpec;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -43,12 +49,9 @@ import java.util.Collections;
 import java.util.List;
 
 
-@RunWith(Parameterized.class)
+@RunWith(Enclosed.class)
 public class ScanQueryRunnerFactoryTest
 {
-  private int numElements;
-  private ScanQuery query;
-  private ScanQuery.ResultFormat resultFormat;
 
   private static final ScanQueryRunnerFactory factory = new 
ScanQueryRunnerFactory(
       new ScanQueryQueryToolChest(
@@ -59,200 +62,261 @@ public class ScanQueryRunnerFactoryTest
       new ScanQueryConfig()
   );
 
-  public ScanQueryRunnerFactoryTest(
-      final int numElements,
-      final int batchSize,
-      final long limit,
-      final ScanQuery.ResultFormat resultFormat,
-      final ScanQuery.Order order
-  )
+  @RunWith(Parameterized.class)
+  public static class ScanQueryRunnerFactoryParameterizedTest
   {
-    this.numElements = numElements;
-    this.query = Druids.newScanQueryBuilder()
-                       .batchSize(batchSize)
-                       .limit(limit)
-                       .order(order)
-                       .intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
-                       .dataSource("some datasource")
-                       .resultFormat(resultFormat)
-                       .build();
-    this.resultFormat = resultFormat;
-  }
+    private int numElements;
+    private ScanQuery query;
+    private ScanQuery.ResultFormat resultFormat;
 
-  @Parameterized.Parameters(name = "{0} {1} {2} {3} {4}")
-  public static Iterable<Object[]> constructorFeeder()
-  {
-    List<Integer> numsElements = ImmutableList.of(0, 10, 100);
-    List<Integer> batchSizes = ImmutableList.of(1, 100);
-    List<Long> limits = ImmutableList.of(3L, 1000L, Long.MAX_VALUE);
-    List<ScanQuery.ResultFormat> resultFormats = ImmutableList.of(
-        ScanQuery.ResultFormat.RESULT_FORMAT_LIST,
-        ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST
-    );
-    List<ScanQuery.Order> order = ImmutableList.of(
-        ScanQuery.Order.ASCENDING,
-        ScanQuery.Order.DESCENDING
-    );
-
-    return QueryRunnerTestHelper.cartesian(
-        numsElements,
-        batchSizes,
-        limits,
-        resultFormats,
-        order
-    );
-  }
+    public ScanQueryRunnerFactoryParameterizedTest(
+        final int numElements,
+        final int batchSize,
+        final long limit,
+        final ScanQuery.ResultFormat resultFormat,
+        final ScanQuery.Order order
+    )
+    {
+      this.numElements = numElements;
+      this.query = Druids.newScanQueryBuilder()
+                         .batchSize(batchSize)
+                         .limit(limit)
+                         .order(order)
+                         .intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
+                         .dataSource("some datasource")
+                         .resultFormat(resultFormat)
+                         .build();
+      this.resultFormat = resultFormat;
+    }
 
-  @Test
-  public void testSortAndLimitScanResultValues()
-  {
-    List<ScanResultValue> srvs = new ArrayList<>(numElements);
-    List<Long> expectedEventTimestamps = new ArrayList<>();
-    for (int i = 0; i < numElements; i++) {
-      long timestamp = DateTimes.of("2015-01-01").plusHours(i).getMillis();
-      expectedEventTimestamps.add(timestamp);
-      srvs.add(ScanQueryTestHelper.generateScanResultValue(timestamp, 
resultFormat, 1));
+    @Parameterized.Parameters(name = "{0} {1} {2} {3} {4}")
+    public static Iterable<Object[]> constructorFeeder()
+    {
+      List<Integer> numsElements = ImmutableList.of(0, 10, 100);
+      List<Integer> batchSizes = ImmutableList.of(1, 100);
+      List<Long> limits = ImmutableList.of(3L, 1000L, Long.MAX_VALUE);
+      List<ScanQuery.ResultFormat> resultFormats = ImmutableList.of(
+          ScanQuery.ResultFormat.RESULT_FORMAT_LIST,
+          ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST
+      );
+      List<ScanQuery.Order> order = ImmutableList.of(
+          ScanQuery.Order.ASCENDING,
+          ScanQuery.Order.DESCENDING
+      );
+
+      return QueryRunnerTestHelper.cartesian(
+          numsElements,
+          batchSizes,
+          limits,
+          resultFormats,
+          order
+      );
     }
-    expectedEventTimestamps.sort((o1, o2) -> {
-      int retVal = 0;
-      if (o1 > o2) {
-        retVal = 1;
-      } else if (o1 < o2) {
-        retVal = -1;
+
+    @Test
+    public void testSortAndLimitScanResultValues()
+    {
+      List<ScanResultValue> srvs = new ArrayList<>(numElements);
+      List<Long> expectedEventTimestamps = new ArrayList<>();
+      for (int i = 0; i < numElements; i++) {
+        long timestamp = DateTimes.of("2015-01-01").plusHours(i).getMillis();
+        expectedEventTimestamps.add(timestamp);
+        srvs.add(ScanQueryTestHelper.generateScanResultValue(timestamp, 
resultFormat, 1));
       }
-      if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
-        return retVal * -1;
+      expectedEventTimestamps.sort((o1, o2) -> {
+        int retVal = 0;
+        if (o1 > o2) {
+          retVal = 1;
+        } else if (o1 < o2) {
+          retVal = -1;
+        }
+        if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
+          return retVal * -1;
+        }
+        return retVal;
+      });
+      Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs);
+      try {
+        List<ScanResultValue> output = factory.priorityQueueSortAndLimit(
+            inputSequence,
+            query,
+            ImmutableList.of(new SegmentDescriptor(new Interval(
+                DateTimes.of("2010-01-01"),
+                DateTimes.of("2019-01-01").plusHours(1)
+            ), "1", 0))
+        ).toList();
+        if (query.getLimit() > Integer.MAX_VALUE) {
+          Assert.fail("Unsupported exception should have been thrown due to 
high limit");
+        }
+        validateSortedOutput(output, expectedEventTimestamps);
       }
-      return retVal;
-    });
-    Sequence<ScanResultValue> inputSequence = Sequences.simple(srvs);
-    try {
-      List<ScanResultValue> output = factory.priorityQueueSortAndLimit(
-          inputSequence,
-          query,
-          ImmutableList.of(new SegmentDescriptor(new Interval(
-              DateTimes.of("2010-01-01"),
-              DateTimes.of("2019-01-01").plusHours(1)
-          ), "1", 0))
-      ).toList();
-      if (query.getLimit() > Integer.MAX_VALUE) {
-        Assert.fail("Unsupported exception should have been thrown due to high 
limit");
+      catch (UOE e) {
+        if (query.getLimit() <= Integer.MAX_VALUE) {
+          Assert.fail("Unsupported operation exception should not have been 
thrown here");
+        }
       }
-      validateSortedOutput(output, expectedEventTimestamps);
     }
-    catch (UOE e) {
-      if (query.getLimit() <= Integer.MAX_VALUE) {
-        Assert.fail("Unsupported operation exception should not have been 
thrown here");
+
+    @Test
+    public void testNWayMerge()
+    {
+      List<Long> expectedEventTimestamps = new ArrayList<>(numElements * 3);
+
+      List<ScanResultValue> scanResultValues1 = new ArrayList<>(numElements);
+      for (int i = 0; i < numElements; i++) {
+        long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 
2).getMillis();
+        expectedEventTimestamps.add(timestamp);
+        
scanResultValues1.add(ScanQueryTestHelper.generateScanResultValue(timestamp, 
resultFormat, 1));
       }
-    }
-  }
 
-  @Test
-  public void testNWayMerge()
-  {
-    List<Long> expectedEventTimestamps = new ArrayList<>(numElements * 3);
+      List<ScanResultValue> scanResultValues2 = new ArrayList<>(numElements);
+      for (int i = 0; i < numElements; i++) {
+        long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2 + 
1).getMillis();
+        expectedEventTimestamps.add(timestamp);
+        
scanResultValues2.add(ScanQueryTestHelper.generateScanResultValue(timestamp, 
resultFormat, 1));
+      }
 
-    List<ScanResultValue> scanResultValues1 = new ArrayList<>(numElements);
-    for (int i = 0; i < numElements; i++) {
-      long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 
2).getMillis();
-      expectedEventTimestamps.add(timestamp);
-      
scanResultValues1.add(ScanQueryTestHelper.generateScanResultValue(timestamp, 
resultFormat, 1));
-    }
+      List<ScanResultValue> scanResultValues3 = new ArrayList<>(numElements);
+      for (int i = 0; i < numElements; i++) {
+        long timestamp = DateTimes.of("2015-01-02").plusMinutes(i).getMillis();
+        expectedEventTimestamps.add(timestamp);
+        
scanResultValues3.add(ScanQueryTestHelper.generateScanResultValue(timestamp, 
resultFormat, 1));
+      }
 
-    List<ScanResultValue> scanResultValues2 = new ArrayList<>(numElements);
-    for (int i = 0; i < numElements; i++) {
-      long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2 + 
1).getMillis();
-      expectedEventTimestamps.add(timestamp);
-      
scanResultValues2.add(ScanQueryTestHelper.generateScanResultValue(timestamp, 
resultFormat, 1));
-    }
+      if (query.getOrder() == ScanQuery.Order.DESCENDING) {
+        Collections.reverse(scanResultValues1);
+        Collections.reverse(scanResultValues2);
+        Collections.reverse(scanResultValues3);
+      }
 
-    List<ScanResultValue> scanResultValues3 = new ArrayList<>(numElements);
-    for (int i = 0; i < numElements; i++) {
-      long timestamp = DateTimes.of("2015-01-02").plusMinutes(i).getMillis();
-      expectedEventTimestamps.add(timestamp);
-      
scanResultValues3.add(ScanQueryTestHelper.generateScanResultValue(timestamp, 
resultFormat, 1));
-    }
+      QueryRunner<ScanResultValue> runnerSegment1Partition1 =
+          (queryPlus, responseContext) -> Sequences.simple(scanResultValues1);
+
+      QueryRunner<ScanResultValue> runnerSegment1Partition2 =
+          (queryPlus, responseContext) -> Sequences.simple(scanResultValues2);
 
-    if (query.getOrder() == ScanQuery.Order.DESCENDING) {
-      Collections.reverse(scanResultValues1);
-      Collections.reverse(scanResultValues2);
-      Collections.reverse(scanResultValues3);
-    }
 
-    QueryRunner<ScanResultValue> runnerSegment1Partition1 =
-        (queryPlus, responseContext) -> Sequences.simple(scanResultValues1);
+      QueryRunner<ScanResultValue> runnerSegment2Partition1 =
+          (queryPlus, responseContext) -> Sequences.simple(scanResultValues3);
 
-    QueryRunner<ScanResultValue> runnerSegment1Partition2 =
-        (queryPlus, responseContext) -> Sequences.simple(scanResultValues2);
+      QueryRunner<ScanResultValue> runnerSegment2Partition2 =
+          (queryPlus, responseContext) -> Sequences.empty();
 
+      List<List<QueryRunner<ScanResultValue>>> groupedRunners = new 
ArrayList<>(2);
 
-    QueryRunner<ScanResultValue> runnerSegment2Partition1 =
-        (queryPlus, responseContext) -> Sequences.simple(scanResultValues3);
+      if (query.getOrder() == ScanQuery.Order.DESCENDING) {
+        groupedRunners.add(Arrays.asList(runnerSegment2Partition1, 
runnerSegment2Partition2));
+        groupedRunners.add(Arrays.asList(runnerSegment1Partition1, 
runnerSegment1Partition2));
+      } else {
+        groupedRunners.add(Arrays.asList(runnerSegment1Partition1, 
runnerSegment1Partition2));
+        groupedRunners.add(Arrays.asList(runnerSegment2Partition1, 
runnerSegment2Partition2));
+      }
 
-    QueryRunner<ScanResultValue> runnerSegment2Partition2 =
-        (queryPlus, responseContext) -> Sequences.empty();
+      expectedEventTimestamps.sort((o1, o2) -> {
+        int retVal = 0;
+        if (o1 > o2) {
+          retVal = 1;
+        } else if (o1 < o2) {
+          retVal = -1;
+        }
+        if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
+          return retVal * -1;
+        }
+        return retVal;
+      });
 
-    List<List<QueryRunner<ScanResultValue>>> groupedRunners = new 
ArrayList<>(2);
+      List<ScanResultValue> output =
+          factory.nWayMergeAndLimit(
+              groupedRunners,
+              QueryPlus.wrap(query),
+              ImmutableMap.of()
+          ).toList();
 
-    if (query.getOrder() == ScanQuery.Order.DESCENDING) {
-      groupedRunners.add(Arrays.asList(runnerSegment2Partition1, 
runnerSegment2Partition2));
-      groupedRunners.add(Arrays.asList(runnerSegment1Partition1, 
runnerSegment1Partition2));
-    } else {
-      groupedRunners.add(Arrays.asList(runnerSegment1Partition1, 
runnerSegment1Partition2));
-      groupedRunners.add(Arrays.asList(runnerSegment2Partition1, 
runnerSegment2Partition2));
+      validateSortedOutput(output, expectedEventTimestamps);
     }
 
-    expectedEventTimestamps.sort((o1, o2) -> {
-      int retVal = 0;
-      if (o1 > o2) {
-        retVal = 1;
-      } else if (o1 < o2) {
-        retVal = -1;
+    private void validateSortedOutput(List<ScanResultValue> output, List<Long> 
expectedEventTimestamps)
+    {
+      // check each scan result value has one event
+      for (ScanResultValue srv : output) {
+        if 
(resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
+          
Assert.assertTrue(ScanQueryTestHelper.getEventsCompactedListResultFormat(srv).size()
 == 1);
+        } else if 
(resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
+          
Assert.assertTrue(ScanQueryTestHelper.getEventsListResultFormat(srv).size() == 
1);
+        }
       }
-      if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
-        return retVal * -1;
+
+      // check total # of rows <= limit
+      Assert.assertTrue(output.size() <= query.getLimit());
+
+      // check ordering is correct
+      for (int i = 1; i < output.size(); i++) {
+        if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
+          Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) 
<
+                            output.get(i - 
1).getFirstEventTimestamp(resultFormat));
+        } else {
+          Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) 
>
+                            output.get(i - 
1).getFirstEventTimestamp(resultFormat));
+        }
       }
-      return retVal;
-    });
-
-    List<ScanResultValue> output =
-        factory.nWayMergeAndLimit(
-            groupedRunners,
-            QueryPlus.wrap(query),
-            ImmutableMap.of()
-        ).toList();
 
-    validateSortedOutput(output, expectedEventTimestamps);
+      // check the values are correct
+      for (int i = 0; i < query.getLimit() && i < output.size(); i++) {
+        Assert.assertEquals((long) expectedEventTimestamps.get(i), 
output.get(i).getFirstEventTimestamp(resultFormat));
+      }
+    }
   }
 
-  private void validateSortedOutput(List<ScanResultValue> output, List<Long> 
expectedEventTimestamps)
+  public static class ScanQueryRunnerFactoryNonParameterizedTest
   {
-    // check each scan result value has one event
-    for (ScanResultValue srv : output) {
-      if 
(resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
-        
Assert.assertTrue(ScanQueryTestHelper.getEventsCompactedListResultFormat(srv).size()
 == 1);
-      } else if 
(resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
-        
Assert.assertTrue(ScanQueryTestHelper.getEventsListResultFormat(srv).size() == 
1);
-      }
-    }
+    private SegmentDescriptor descriptor = new SegmentDescriptor(new Interval(
+        DateTimes.of("2010-01-01"),
+        DateTimes.of("2019-01-01").plusHours(1)
+    ), "1", 0);
 
-    // check total # of rows <= limit
-    Assert.assertTrue(output.size() <= query.getLimit());
+    @Test
+    public void testGetValidSegmentDescriptorsFromSpec()
+    {
+      QuerySegmentSpec multiSpecificSpec = new MultipleSpecificSegmentSpec(
+          Collections.singletonList(
+              descriptor
+          )
+      );
+      QuerySegmentSpec singleSpecificSpec = new 
SpecificSegmentSpec(descriptor);
 
-    // check ordering is correct
-    for (int i = 1; i < output.size(); i++) {
-      if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
-        Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) <
-                          output.get(i - 
1).getFirstEventTimestamp(resultFormat));
-      } else {
-        Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) >
-                          output.get(i - 
1).getFirstEventTimestamp(resultFormat));
-      }
+      List<SegmentDescriptor> descriptors = 
factory.getSegmentDescriptorsFromSpecificQuerySpec(multiSpecificSpec);
+      Assert.assertEquals(1, descriptors.size());
+      Assert.assertEquals(descriptor, descriptors.get(0));
+
+      descriptors = 
factory.getSegmentDescriptorsFromSpecificQuerySpec(singleSpecificSpec);
+      Assert.assertEquals(1, descriptors.size());
+      Assert.assertEquals(descriptor, descriptors.get(0));
+    }
+
+    @Test(expected = UOE.class)
+    public void testGetSegmentDescriptorsFromInvalidIntervalSpec()
+    {
+      QuerySegmentSpec multiIntervalSpec = new MultipleIntervalSegmentSpec(
+          Collections.singletonList(
+              new Interval(
+                  DateTimes.of("2010-01-01"),
+                  DateTimes.of("2019-01-01").plusHours(1)
+              )
+          )
+      );
+      factory.getSegmentDescriptorsFromSpecificQuerySpec(multiIntervalSpec);
     }
 
-    // check the values are correct
-    for (int i = 0; i < query.getLimit() && i < output.size(); i++) {
-      Assert.assertEquals((long) expectedEventTimestamps.get(i), 
output.get(i).getFirstEventTimestamp(resultFormat));
+    @Test(expected = UOE.class)
+    public void testGetSegmentDescriptorsFromInvalidLegacySpec()
+    {
+      QuerySegmentSpec legacySpec = new LegacySegmentSpec(
+          new Interval(
+              DateTimes.of("2010-01-01"),
+              DateTimes.of("2019-01-01").plusHours(1)
+          )
+      );
+      factory.getSegmentDescriptorsFromSpecificQuerySpec(legacySpec);
     }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java 
b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java
index b3a0d00..2928ca9 100644
--- 
a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java
@@ -67,6 +67,7 @@ import java.util.Map;
 import java.util.Set;
 
 /**
+ *
  */
 @RunWith(Parameterized.class)
 public class ScanQueryRunnerTest
@@ -143,11 +144,11 @@ public class ScanQueryRunnerTest
   private Druids.ScanQueryBuilder newTestQuery()
   {
     return Druids.newScanQueryBuilder()
-                    .dataSource(new 
TableDataSource(QueryRunnerTestHelper.dataSource))
-                    .columns(Collections.emptyList())
-                    .intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
-                    .limit(3)
-                    .legacy(legacy);
+                 .dataSource(new 
TableDataSource(QueryRunnerTestHelper.dataSource))
+                 .columns(Collections.emptyList())
+                 .intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
+                 .limit(3)
+                 .legacy(legacy);
   }
 
   @Test
@@ -524,7 +525,11 @@ public class ScanQueryRunnerTest
       ScanQuery query = newTestQuery()
           .intervals(I_0112_0114)
           .filters(new 
SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
-          .columns(QueryRunnerTestHelper.qualityDimension, 
QueryRunnerTestHelper.indexMetric)
+          .columns(
+              QueryRunnerTestHelper.timeDimension,
+              QueryRunnerTestHelper.qualityDimension,
+              QueryRunnerTestHelper.indexMetric
+          )
           .limit(limit)
           .order(ScanQuery.Order.ASCENDING)
           .context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false))
@@ -556,7 +561,7 @@ public class ScanQueryRunnerTest
       };
       final List<List<Map<String, Object>>> ascendingEvents = toEvents(
           new String[]{
-              legacy ? getTimestampName() + ":TIME" : null,
+              legacy ? getTimestampName() + ":TIME" : 
ColumnHolder.TIME_COLUMN_NAME,
               null,
               QueryRunnerTestHelper.qualityDimension + ":STRING",
               null,
@@ -565,9 +570,35 @@ public class ScanQueryRunnerTest
           },
           (String[]) ArrayUtils.addAll(seg1Results, seg2Results)
       );
+
+      if (legacy) {
+        for (List<Map<String, Object>> batch : ascendingEvents) {
+          for (Map<String, Object> event : batch) {
+            event.put("__time", ((DateTime) 
event.get("timestamp")).getMillis());
+          }
+        }
+      } else {
+        for (List<Map<String, Object>> batch : ascendingEvents) {
+          for (Map<String, Object> event : batch) {
+            event.put("__time", (DateTimes.of((String) 
event.get("__time"))).getMillis());
+          }
+        }
+      }
+
       List<ScanResultValue> ascendingExpectedResults = toExpected(
           ascendingEvents,
-          legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") 
: Lists.newArrayList("quality", "index"),
+          legacy ?
+          Lists.newArrayList(
+              QueryRunnerTestHelper.timeDimension,
+              getTimestampName(),
+              "quality",
+              "index"
+          ) :
+          Lists.newArrayList(
+              QueryRunnerTestHelper.timeDimension,
+              "quality",
+              "index"
+          ),
           0,
           limit
       );
@@ -583,7 +614,11 @@ public class ScanQueryRunnerTest
       ScanQuery query = newTestQuery()
           .intervals(I_0112_0114)
           .filters(new 
SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
-          .columns(QueryRunnerTestHelper.qualityDimension, 
QueryRunnerTestHelper.indexMetric)
+          .columns(
+              QueryRunnerTestHelper.timeDimension,
+              QueryRunnerTestHelper.qualityDimension,
+              QueryRunnerTestHelper.indexMetric
+          )
           .limit(limit)
           .order(ScanQuery.Order.DESCENDING)
           .build();
@@ -616,7 +651,7 @@ public class ScanQueryRunnerTest
       ArrayUtils.reverse(expectedRet);
       final List<List<Map<String, Object>>> descendingEvents = toEvents(
           new String[]{
-              legacy ? getTimestampName() + ":TIME" : null,
+              legacy ? getTimestampName() + ":TIME" : 
ColumnHolder.TIME_COLUMN_NAME,
               null,
               QueryRunnerTestHelper.qualityDimension + ":STRING",
               null,
@@ -625,9 +660,34 @@ public class ScanQueryRunnerTest
           },
           expectedRet
       );
+      if (legacy) {
+        for (List<Map<String, Object>> batch : descendingEvents) {
+          for (Map<String, Object> event : batch) {
+            event.put("__time", ((DateTime) 
event.get("timestamp")).getMillis());
+          }
+        }
+      } else {
+        for (List<Map<String, Object>> batch : descendingEvents) {
+          for (Map<String, Object> event : batch) {
+            event.put("__time", (DateTimes.of((String) 
event.get("__time"))).getMillis());
+          }
+        }
+      }
       List<ScanResultValue> descendingExpectedResults = toExpected(
           descendingEvents,
-          legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") 
: Lists.newArrayList("quality", "index"),
+          legacy ?
+          Lists.newArrayList(
+              QueryRunnerTestHelper.timeDimension,
+              getTimestampName(),
+              // getTimestampName() always returns the legacy timestamp when 
legacy is true
+              "quality",
+              "index"
+          ) :
+          Lists.newArrayList(
+              QueryRunnerTestHelper.timeDimension,
+              "quality",
+              "index"
+          ),
           0,
           limit
       );
@@ -666,7 +726,11 @@ public class ScanQueryRunnerTest
       ScanQuery query = newTestQuery()
           .intervals(I_0112_0114)
           .filters(new 
SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
-          .columns(QueryRunnerTestHelper.qualityDimension, 
QueryRunnerTestHelper.indexMetric)
+          .columns(
+              QueryRunnerTestHelper.timeDimension,
+              QueryRunnerTestHelper.qualityDimension,
+              QueryRunnerTestHelper.indexMetric
+          )
           .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
           .order(ScanQuery.Order.ASCENDING)
           .limit(limit)
@@ -676,7 +740,7 @@ public class ScanQueryRunnerTest
       Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query), 
context).toList();
       final List<List<Map<String, Object>>> ascendingEvents = toEvents(
           new String[]{
-              legacy ? getTimestampName() + ":TIME" : null,
+              legacy ? getTimestampName() + ":TIME" : 
ColumnHolder.TIME_COLUMN_NAME,
               null,
               QueryRunnerTestHelper.qualityDimension + ":STRING",
               null,
@@ -685,9 +749,34 @@ public class ScanQueryRunnerTest
           },
           (String[]) ArrayUtils.addAll(seg1Results, seg2Results)
       );
+      if (legacy) {
+        for (List<Map<String, Object>> batch : ascendingEvents) {
+          for (Map<String, Object> event : batch) {
+            event.put("__time", ((DateTime) 
event.get("timestamp")).getMillis());
+          }
+        }
+      } else {
+        for (List<Map<String, Object>> batch : ascendingEvents) {
+          for (Map<String, Object> event : batch) {
+            event.put("__time", ((DateTimes.of((String) 
event.get("__time"))).getMillis()));
+          }
+        }
+      }
       List<ScanResultValue> ascendingExpectedResults = toExpected(
           ascendingEvents,
-          legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") 
: Lists.newArrayList("quality", "index"),
+          legacy ?
+          Lists.newArrayList(
+              QueryRunnerTestHelper.timeDimension,
+              getTimestampName(),
+              // getTimestampName() always returns the legacy timestamp when 
legacy is true
+              "quality",
+              "index"
+          ) :
+          Lists.newArrayList(
+              QueryRunnerTestHelper.timeDimension,
+              "quality",
+              "index"
+          ),
           0,
           limit
       );
@@ -727,7 +816,11 @@ public class ScanQueryRunnerTest
       ScanQuery query = newTestQuery()
           .intervals(I_0112_0114)
           .filters(new 
SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
-          .columns(QueryRunnerTestHelper.qualityDimension, 
QueryRunnerTestHelper.indexMetric)
+          .columns(
+              QueryRunnerTestHelper.timeDimension,
+              QueryRunnerTestHelper.qualityDimension,
+              QueryRunnerTestHelper.indexMetric
+          )
           .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
           .order(ScanQuery.Order.DESCENDING)
           .context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false))
@@ -740,7 +833,7 @@ public class ScanQueryRunnerTest
       ArrayUtils.reverse(expectedRet);
       final List<List<Map<String, Object>>> descendingEvents = toEvents(
           new String[]{
-              legacy ? getTimestampName() + ":TIME" : null,
+              legacy ? getTimestampName() + ":TIME" : 
ColumnHolder.TIME_COLUMN_NAME,
               null,
               QueryRunnerTestHelper.qualityDimension + ":STRING",
               null,
@@ -749,9 +842,34 @@ public class ScanQueryRunnerTest
           },
           expectedRet //segments in reverse order from above
       );
+      if (legacy) {
+        for (List<Map<String, Object>> batch : descendingEvents) {
+          for (Map<String, Object> event : batch) {
+            event.put("__time", ((DateTime) 
event.get("timestamp")).getMillis());
+          }
+        }
+      } else {
+        for (List<Map<String, Object>> batch : descendingEvents) {
+          for (Map<String, Object> event : batch) {
+            event.put("__time", ((DateTimes.of((String) 
event.get("__time"))).getMillis()));
+          }
+        }
+      }
       List<ScanResultValue> descendingExpectedResults = toExpected(
           descendingEvents,
-          legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") 
: Lists.newArrayList("quality", "index"),
+          legacy ?
+          Lists.newArrayList(
+              QueryRunnerTestHelper.timeDimension,
+              getTimestampName(),
+              // getTimestampName() always returns the legacy timestamp when 
legacy is true
+              "quality",
+              "index"
+          ) :
+          Lists.newArrayList(
+              QueryRunnerTestHelper.timeDimension,
+              "quality",
+              "index"
+          ),
           0,
           limit
       );
@@ -760,7 +878,6 @@ public class ScanQueryRunnerTest
     }
   }
 
-
   private List<List<Map<String, Object>>> toFullEvents(final String[]... 
valueSet)
   {
     return toEvents(
@@ -799,71 +916,66 @@ public class ScanQueryRunnerTest
         Lists.newArrayList(
             Iterables.transform(
                 values,
-                new Function<String, Map<String, Object>>()
-                {
-                  @Override
-                  public Map<String, Object> apply(String input)
-                  {
-                    Map<String, Object> event = new HashMap<>();
-                    String[] values = input.split("\\t");
-                    for (int i = 0; i < dimSpecs.length; i++) {
-                      if (dimSpecs[i] == null || i >= dimSpecs.length) {
-                        continue;
-                      }
-
-                      // For testing metrics and virtual columns we have some 
special handling here, since
-                      // they don't appear in the source data.
-                      if (dimSpecs[i].equals(EXPR_COLUMN.getOutputName())) {
-                        event.put(
-                            EXPR_COLUMN.getOutputName(),
-                            (double) 
event.get(QueryRunnerTestHelper.indexMetric) * 2
-                        );
-                        continue;
-                      } else if (dimSpecs[i].equals("indexMin")) {
-                        event.put("indexMin", (double) 
event.get(QueryRunnerTestHelper.indexMetric));
-                        continue;
-                      } else if (dimSpecs[i].equals("indexFloat")) {
-                        event.put("indexFloat", (float) (double) 
event.get(QueryRunnerTestHelper.indexMetric));
-                        continue;
-                      } else if (dimSpecs[i].equals("indexMaxPlusTen")) {
-                        event.put("indexMaxPlusTen", (double) 
event.get(QueryRunnerTestHelper.indexMetric) + 10);
-                        continue;
-                      } else if (dimSpecs[i].equals("indexMinFloat")) {
-                        event.put("indexMinFloat", (float) (double) 
event.get(QueryRunnerTestHelper.indexMetric));
-                        continue;
-                      } else if (dimSpecs[i].equals("indexMaxFloat")) {
-                        event.put("indexMaxFloat", (float) (double) 
event.get(QueryRunnerTestHelper.indexMetric));
-                        continue;
-                      } else if (dimSpecs[i].equals("quality_uniques")) {
-                        final HyperLogLogCollector collector = 
HyperLogLogCollector.makeLatestCollector();
-                        collector.add(
-                            Hashing.murmur3_128()
-                                   .hashBytes(StringUtils.toUtf8((String) 
event.get("quality")))
-                                   .asBytes()
-                        );
-                        event.put("quality_uniques", collector);
-                      }
-
-                      if (i >= values.length) {
-                        continue;
-                      }
-
-                      String[] specs = dimSpecs[i].split(":");
+                input -> {
+                  Map<String, Object> event = new HashMap<>();
+                  String[] values1 = input.split("\\t");
+                  for (int i = 0; i < dimSpecs.length; i++) {
+                    if (dimSpecs[i] == null || i >= dimSpecs.length) {
+                      continue;
+                    }
 
+                    // For testing metrics and virtual columns we have some 
special handling here, since
+                    // they don't appear in the source data.
+                    if (dimSpecs[i].equals(EXPR_COLUMN.getOutputName())) {
                       event.put(
-                          specs[0],
-                          specs.length == 1 || specs[1].equals("STRING") ? 
values[i] :
-                          specs[1].equals("TIME") ? toTimestamp(values[i]) :
-                          specs[1].equals("FLOAT") ? Float.valueOf(values[i]) :
-                          specs[1].equals("DOUBLE") ? 
Double.valueOf(values[i]) :
-                          specs[1].equals("LONG") ? Long.valueOf(values[i]) :
-                          specs[1].equals("NULL") ? null :
-                          specs[1].equals("STRINGS") ? 
Arrays.asList(values[i].split("\u0001")) :
-                          values[i]
+                          EXPR_COLUMN.getOutputName(),
+                          (double) 
event.get(QueryRunnerTestHelper.indexMetric) * 2
                       );
+                      continue;
+                    } else if (dimSpecs[i].equals("indexMin")) {
+                      event.put("indexMin", (double) 
event.get(QueryRunnerTestHelper.indexMetric));
+                      continue;
+                    } else if (dimSpecs[i].equals("indexFloat")) {
+                      event.put("indexFloat", (float) (double) 
event.get(QueryRunnerTestHelper.indexMetric));
+                      continue;
+                    } else if (dimSpecs[i].equals("indexMaxPlusTen")) {
+                      event.put("indexMaxPlusTen", (double) 
event.get(QueryRunnerTestHelper.indexMetric) + 10);
+                      continue;
+                    } else if (dimSpecs[i].equals("indexMinFloat")) {
+                      event.put("indexMinFloat", (float) (double) 
event.get(QueryRunnerTestHelper.indexMetric));
+                      continue;
+                    } else if (dimSpecs[i].equals("indexMaxFloat")) {
+                      event.put("indexMaxFloat", (float) (double) 
event.get(QueryRunnerTestHelper.indexMetric));
+                      continue;
+                    } else if (dimSpecs[i].equals("quality_uniques")) {
+                      final HyperLogLogCollector collector = 
HyperLogLogCollector.makeLatestCollector();
+                      collector.add(
+                          Hashing.murmur3_128()
+                                 .hashBytes(StringUtils.toUtf8((String) 
event.get("quality")))
+                                 .asBytes()
+                      );
+                      event.put("quality_uniques", collector);
+                    }
+
+                    if (i >= values1.length) {
+                      continue;
                     }
-                    return event;
+
+                    String[] specs = dimSpecs[i].split(":");
+
+                    event.put(
+                        specs[0],
+                        specs.length == 1 || specs[1].equals("STRING") ? 
values1[i] :
+                        specs[1].equals("TIME") ? toTimestamp(values1[i]) :
+                        specs[1].equals("FLOAT") ? Float.valueOf(values1[i]) :
+                        specs[1].equals("DOUBLE") ? Double.valueOf(values1[i]) 
:
+                        specs[1].equals("LONG") ? Long.valueOf(values1[i]) :
+                        specs[1].equals("NULL") ? null :
+                        specs[1].equals("STRINGS") ? 
Arrays.asList(values1[i].split("\u0001")) :
+                        values1[i]
+                    );
                   }
+                  return event;
                 }
             )
         )
@@ -969,7 +1081,6 @@ public class ScanQueryRunnerTest
           } else {
             Assert.assertEquals("invalid value for " + ac.getKey(), exVal, 
actVal);
           }
-
         }
       }
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java 
b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java
new file mode 100644
index 0000000..1854883
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+public class ScanQueryTest
+{
+  private static QuerySegmentSpec intervalSpec;
+  private static ScanResultValue s1;
+  private static ScanResultValue s2;
+  private static ScanResultValue s3;
+
+  @BeforeClass
+  public static void setup()
+  {
+    intervalSpec = new MultipleIntervalSegmentSpec(
+        Collections.singletonList(
+            new Interval(DateTimes.of("2012-01-01"), 
DateTimes.of("2012-01-01").plusHours(1))
+        )
+    );
+
+    ArrayList<HashMap<String, Object>> events1 = new ArrayList<>();
+    HashMap<String, Object> event1 = new HashMap<>();
+    event1.put(ColumnHolder.TIME_COLUMN_NAME, new Long(42));
+    events1.add(event1);
+
+    s1 = new ScanResultValue(
+        "segmentId",
+        Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
+        events1
+    );
+
+    ArrayList<HashMap<String, Object>> events2 = new ArrayList<>();
+    HashMap<String, Object> event2 = new HashMap<>();
+    event2.put(ColumnHolder.TIME_COLUMN_NAME, new Long(43));
+    events2.add(event2);
+
+    s2 = new ScanResultValue(
+        "segmentId",
+        Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
+        events2
+    );
+
+    // ScanResultValue s3 has no time column
+    ArrayList<HashMap<String, Object>> events3 = new ArrayList<>();
+    HashMap<String, Object> event3 = new HashMap<>();
+    event3.put("yah", "yeet");
+    events3.add(event3);
+
+    s3 = new ScanResultValue(
+        "segmentId",
+        Collections.singletonList("yah"),
+        events3
+    );
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testAscendingScanQueryWithInvalidColumns()
+  {
+    Druids.newScanQueryBuilder()
+          .order(ScanQuery.Order.ASCENDING)
+          .columns(ImmutableList.of("not time", "also not time"))
+          .dataSource("source")
+          .intervals(intervalSpec)
+          .build();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDescendingScanQueryWithInvalidColumns()
+  {
+    Druids.newScanQueryBuilder()
+          .order(ScanQuery.Order.DESCENDING)
+          .columns(ImmutableList.of("not time", "also not time"))
+          .dataSource("source")
+          .intervals(intervalSpec)
+          .build();
+  }
+
+  // No assertions because we're checking that no IllegalArgumentExceptions 
are thrown
+  @Test
+  public void testValidScanQueryInitialization()
+  {
+    List<ScanQuery.Order> nonOrderedOrders = Arrays.asList(null, 
ScanQuery.Order.NONE);
+
+    for (ScanQuery.Order order : nonOrderedOrders) {
+      Druids.newScanQueryBuilder()
+            .order(order)
+            .columns(ImmutableList.of("not time"))
+            .dataSource("source")
+            .intervals(intervalSpec)
+            .build();
+
+      Druids.newScanQueryBuilder()
+            .order(order)
+            .dataSource("source")
+            .intervals(intervalSpec)
+            .build();
+
+
+      Druids.newScanQueryBuilder()
+            .order(order)
+            .columns(ImmutableList.of())
+            .dataSource("source")
+            .intervals(intervalSpec)
+            .build();
+
+      Druids.newScanQueryBuilder()
+            .order(order)
+            .columns(ImmutableList.of("__time"))
+            .dataSource("source")
+            .intervals(intervalSpec)
+            .build();
+    }
+
+    Set<ScanQuery.Order> orderedOrders = 
ImmutableSet.of(ScanQuery.Order.ASCENDING, ScanQuery.Order.DESCENDING);
+
+    for (ScanQuery.Order order : orderedOrders) {
+      Druids.newScanQueryBuilder()
+            .order(order)
+            .columns((List<String>) null)
+            .dataSource("source")
+            .intervals(intervalSpec)
+            .build();
+
+      Druids.newScanQueryBuilder()
+            .order(order)
+            .columns(ImmutableList.of())
+            .dataSource("source")
+            .intervals(intervalSpec)
+            .build();
+
+      Druids.newScanQueryBuilder()
+            .order(order)
+            .dataSource("source")
+            .intervals(intervalSpec)
+            .build();
+
+      Druids.newScanQueryBuilder()
+            .order(order)
+            .columns(ImmutableList.of("__time", "col2"))
+            .dataSource("source")
+            .intervals(intervalSpec)
+            .build();
+    }
+  }
+
+  // Validates that getResultOrdering will work for the broker n-way merge
+  @Test
+  public void testMergeSequenceForResults()
+  {
+    // Should be able to handle merging s1, s2, s3
+    ScanQuery noOrderScan = Druids.newScanQueryBuilder()
+                                  .order(ScanQuery.Order.NONE)
+                                  
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
+                                  .dataSource("some src")
+                                  .intervals(intervalSpec)
+                                  .build();
+
+    // Should only handle s1 and s2
+    ScanQuery descendingOrderScan = Druids.newScanQueryBuilder()
+                                          .order(ScanQuery.Order.DESCENDING)
+                                          
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
+                                          .dataSource("some src")
+                                          .intervals(intervalSpec)
+                                          .build();
+
+    // Should only handle s1 and s2
+    ScanQuery ascendingOrderScan = Druids.newScanQueryBuilder()
+                                         .order(ScanQuery.Order.ASCENDING)
+                                         
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
+                                         .dataSource("some src")
+                                         .intervals(intervalSpec)
+                                         .build();
+    // No Order
+    Sequence<ScanResultValue> noOrderSeq =
+        Sequences.simple(
+            ImmutableList.of(
+                Sequences.simple(ImmutableList.of(s1, s3)),
+                Sequences.simple(ImmutableList.of(s2))
+            )
+        ).flatMerge(seq -> seq, noOrderScan.getResultOrdering());
+
+    List<ScanResultValue> noOrderList = noOrderSeq.toList();
+    Assert.assertEquals(3, noOrderList.size());
+
+
+    // Ascending
+    Sequence<ScanResultValue> ascendingOrderSeq = Sequences.simple(
+        ImmutableList.of(
+            Sequences.simple(ImmutableList.of(s1)),
+            Sequences.simple(ImmutableList.of(s2))
+        )
+    ).flatMerge(seq -> seq, ascendingOrderScan.getResultOrdering());
+
+    List<ScanResultValue> ascendingList = ascendingOrderSeq.toList();
+    Assert.assertEquals(2, ascendingList.size());
+    Assert.assertEquals(s1, ascendingList.get(0));
+    Assert.assertEquals(s2, ascendingList.get(1));
+
+    // Descending
+    Sequence<ScanResultValue> descendingOrderSeq = Sequences.simple(
+        ImmutableList.of(
+            Sequences.simple(ImmutableList.of(s1)),
+            Sequences.simple(ImmutableList.of(s2))
+        )
+    ).flatMerge(seq -> seq, descendingOrderScan.getResultOrdering());
+
+    List<ScanResultValue> descendingList = descendingOrderSeq.toList();
+    Assert.assertEquals(2, descendingList.size());
+    Assert.assertEquals(s2, descendingList.get(0));
+    Assert.assertEquals(s1, descendingList.get(1));
+  }
+
+  @Test(expected = ISE.class)
+  public void testTimeOrderingWithoutTimeColumn()
+  {
+    ScanQuery descendingOrderScan = Druids.newScanQueryBuilder()
+                                          .order(ScanQuery.Order.DESCENDING)
+                                          
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
+                                          .dataSource("some src")
+                                          .intervals(intervalSpec)
+                                          .build();
+    // This should fail because s3 doesn't have a timestamp
+    Sequence<ScanResultValue> borkedSequence = Sequences.simple(
+        ImmutableList.of(
+            Sequences.simple(ImmutableList.of(s1)),
+            Sequences.simple(ImmutableList.of(s2, s3))
+        )
+    ).flatMerge(seq -> seq, descendingOrderScan.getResultOrdering());
+
+    // This should throw an ISE
+    List<ScanResultValue> res = borkedSequence.toList();
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java
index 70f2e08..465794a 100644
--- 
a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java
@@ -49,7 +49,7 @@ public class ScanResultValueTimestampComparatorTest
   }
 
   @Test
-  public void comparisonDescendingListTest()
+  public void testComparisonDescendingList()
   {
     ScanQuery query = Druids.newScanQueryBuilder()
                             .order(ScanQuery.Order.DESCENDING)
@@ -86,7 +86,7 @@ public class ScanResultValueTimestampComparatorTest
   }
 
   @Test
-  public void comparisonAscendingListTest()
+  public void testComparisonAscendingList()
   {
     ScanQuery query = Druids.newScanQueryBuilder()
                             .order(ScanQuery.Order.ASCENDING)
@@ -123,7 +123,7 @@ public class ScanResultValueTimestampComparatorTest
   }
 
   @Test
-  public void comparisonDescendingCompactedListTest()
+  public void testComparisonDescendingCompactedList()
   {
     ScanQuery query = Druids.newScanQueryBuilder()
                             .order(ScanQuery.Order.DESCENDING)
@@ -158,7 +158,7 @@ public class ScanResultValueTimestampComparatorTest
   }
 
   @Test
-  public void comparisonAscendingCompactedListTest()
+  public void testAscendingCompactedList()
   {
     ScanQuery query = Druids.newScanQueryBuilder()
                             .order(ScanQuery.Order.ASCENDING)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to