fjy closed pull request #6230: [Backport] Fix four bugs with numeric dimension 
output types.
URL: https://github.com/apache/incubator-druid/pull/6230
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java 
b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
index 1f9b45e62eb..7d3dbc0844b 100644
--- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
@@ -58,6 +58,7 @@
 import io.druid.query.ordering.StringComparators;
 import io.druid.query.spec.LegacySegmentSpec;
 import io.druid.query.spec.QuerySegmentSpec;
+import io.druid.segment.DimensionHandlerUtils;
 import io.druid.segment.VirtualColumn;
 import io.druid.segment.VirtualColumns;
 import io.druid.segment.column.Column;
@@ -377,7 +378,7 @@ public boolean determineApplyLimitPushDown()
     final List<String> orderedFieldNames = new ArrayList<>();
     final Set<Integer> dimsInOrderBy = new HashSet<>();
     final List<Boolean> needsReverseList = new ArrayList<>();
-    final List<Boolean> isNumericField = new ArrayList<>();
+    final List<ValueType> dimensionTypes = new ArrayList<>();
     final List<StringComparator> comparators = new ArrayList<>();
 
     for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
@@ -389,7 +390,7 @@ public boolean determineApplyLimitPushDown()
         dimsInOrderBy.add(dimIndex);
         needsReverseList.add(needsReverse);
         final ValueType type = dimensions.get(dimIndex).getOutputType();
-        isNumericField.add(ValueType.isNumeric(type));
+        dimensionTypes.add(type);
         comparators.add(orderSpec.getDimensionComparator());
       }
     }
@@ -399,7 +400,7 @@ public boolean determineApplyLimitPushDown()
         orderedFieldNames.add(dimensions.get(i).getOutputName());
         needsReverseList.add(false);
         final ValueType type = dimensions.get(i).getOutputType();
-        isNumericField.add(ValueType.isNumeric(type));
+        dimensionTypes.add(type);
         comparators.add(StringComparators.LEXICOGRAPHIC);
       }
     }
@@ -416,7 +417,7 @@ public int compare(Row lhs, Row rhs)
               return compareDimsForLimitPushDown(
                   orderedFieldNames,
                   needsReverseList,
-                  isNumericField,
+                  dimensionTypes,
                   comparators,
                   lhs,
                   rhs
@@ -434,7 +435,7 @@ public int compare(Row lhs, Row rhs)
               final int cmp = compareDimsForLimitPushDown(
                   orderedFieldNames,
                   needsReverseList,
-                  isNumericField,
+                  dimensionTypes,
                   comparators,
                   lhs,
                   rhs
@@ -463,7 +464,7 @@ public int compare(Row lhs, Row rhs)
               return compareDimsForLimitPushDown(
                   orderedFieldNames,
                   needsReverseList,
-                  isNumericField,
+                  dimensionTypes,
                   comparators,
                   lhs,
                   rhs
@@ -530,28 +531,12 @@ public int compare(Row lhs, Row rhs)
   private static int compareDims(List<DimensionSpec> dimensions, Row lhs, Row 
rhs)
   {
     for (DimensionSpec dimension : dimensions) {
-      final int dimCompare;
-      if (dimension.getOutputType() == ValueType.LONG) {
-        dimCompare = Long.compare(
-            ((Number) lhs.getRaw(dimension.getOutputName())).longValue(),
-            ((Number) rhs.getRaw(dimension.getOutputName())).longValue()
-        );
-      } else if (dimension.getOutputType() == ValueType.FLOAT) {
-        dimCompare = Float.compare(
-            ((Number) lhs.getRaw(dimension.getOutputName())).floatValue(),
-            ((Number) rhs.getRaw(dimension.getOutputName())).floatValue()
-        );
-      } else if (dimension.getOutputType() == ValueType.DOUBLE) {
-        dimCompare = Double.compare(
-            ((Number) lhs.getRaw(dimension.getOutputName())).doubleValue(),
-            ((Number) rhs.getRaw(dimension.getOutputName())).doubleValue()
-        );
-      } else {
-        dimCompare = ((Ordering) Comparators.naturalNullsFirst()).compare(
-            lhs.getRaw(dimension.getOutputName()),
-            rhs.getRaw(dimension.getOutputName())
-        );
-      }
+      //noinspection unchecked
+      final int dimCompare = DimensionHandlerUtils.compareObjectsAsType(
+          lhs.getRaw(dimension.getOutputName()),
+          rhs.getRaw(dimension.getOutputName()),
+          dimension.getOutputType()
+      );
       if (dimCompare != 0) {
         return dimCompare;
       }
@@ -563,7 +548,7 @@ private static int compareDims(List<DimensionSpec> 
dimensions, Row lhs, Row rhs)
   private static int compareDimsForLimitPushDown(
       final List<String> fields,
       final List<Boolean> needsReverseList,
-      final List<Boolean> isNumericField,
+      final List<ValueType> dimensionTypes,
       final List<StringComparator> comparators,
       Row lhs,
       Row rhs
@@ -572,17 +557,15 @@ private static int compareDimsForLimitPushDown(
     for (int i = 0; i < fields.size(); i++) {
       final String fieldName = fields.get(i);
       final StringComparator comparator = comparators.get(i);
+      final ValueType dimensionType = dimensionTypes.get(i);
 
       final int dimCompare;
       final Object lhsObj = lhs.getRaw(fieldName);
       final Object rhsObj = rhs.getRaw(fieldName);
 
-      if (isNumericField.get(i)) {
+      if (ValueType.isNumeric(dimensionType)) {
         if (comparator.equals(StringComparators.NUMERIC)) {
-          dimCompare = ((Ordering) Comparators.naturalNullsFirst()).compare(
-              lhsObj,
-              rhsObj
-          );
+          dimCompare = DimensionHandlerUtils.compareObjectsAsType(lhsObj, 
rhsObj, dimensionType);
         } else {
           dimCompare = comparator.compare(String.valueOf(lhsObj), 
String.valueOf(rhsObj));
         }
diff --git 
a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java
 
b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java
index 28d5acb42de..04607812763 100644
--- 
a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java
+++ 
b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java
@@ -57,6 +57,7 @@
 import io.druid.query.groupby.resource.GroupByQueryResource;
 import io.druid.query.groupby.strategy.GroupByStrategy;
 import io.druid.query.groupby.strategy.GroupByStrategySelector;
+import io.druid.segment.DimensionHandlerUtils;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -451,8 +452,13 @@ public Row apply(Object input)
             Map<String, Object> event = Maps.newLinkedHashMap();
             Iterator<DimensionSpec> dimsIter = dims.iterator();
             while (dimsIter.hasNext() && results.hasNext()) {
-              final DimensionSpec factory = dimsIter.next();
-              event.put(factory.getOutputName(), results.next());
+              final DimensionSpec dimensionSpec = dimsIter.next();
+
+              // Must convert generic Jackson-deserialized type into the 
proper type.
+              event.put(
+                  dimensionSpec.getOutputName(),
+                  DimensionHandlerUtils.convertObjectToType(results.next(), 
dimensionSpec.getOutputType())
+              );
             }
 
             Iterator<AggregatorFactory> aggsIter = aggs.iterator();
diff --git 
a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
 
b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
index dffc4f62603..d5703350086 100644
--- 
a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
+++ 
b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
@@ -682,28 +682,7 @@ private static void 
convertRowTypesToOutputTypes(List<DimensionSpec> dimensionSp
       final ValueType outputType = dimSpec.getOutputType();
       rowMap.compute(
           dimSpec.getOutputName(),
-          (dimName, baseVal) -> {
-            switch (outputType) {
-              case STRING:
-                baseVal = baseVal == null ? "" : baseVal.toString();
-                break;
-              case LONG:
-                baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal);
-                baseVal = baseVal == null ? 0L : baseVal;
-                break;
-              case FLOAT:
-                baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal);
-                baseVal = baseVal == null ? 0.f : baseVal;
-                break;
-              case DOUBLE:
-                baseVal = DimensionHandlerUtils.convertObjectToDouble(baseVal);
-                baseVal = baseVal == null ? 0.d : baseVal;
-                break;
-              default:
-                throw new IAE("Unsupported type: " + outputType);
-            }
-            return baseVal;
-          }
+          (dimName, baseVal) -> 
DimensionHandlerUtils.convertObjectToTypeNonNull(baseVal, outputType)
       );
     }
   }
diff --git 
a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
 
b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
index 290cf13753b..fa4f2a95aea 100644
--- 
a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
+++ 
b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
@@ -602,38 +602,10 @@ public InputRawSupplierColumnSelectorStrategy 
makeColumnSelectorStrategy(
   {
     final Function<Comparable, Comparable>[] functions = new 
Function[valueTypes.size()];
     for (int i = 0; i < functions.length; i++) {
-      ValueType type = valueTypes.get(i);
       // Subquery post-aggs aren't added to the rowSignature (see 
rowSignatureFor() in GroupByQueryHelper) because
       // their types aren't known, so default to String handling.
-      type = type == null ? ValueType.STRING : type;
-      switch (type) {
-        case STRING:
-          functions[i] = input -> input == null ? "" : input.toString();
-          break;
-
-        case LONG:
-          functions[i] = input -> {
-            final Long val = DimensionHandlerUtils.convertObjectToLong(input);
-            return val == null ? 0L : val;
-          };
-          break;
-
-        case FLOAT:
-          functions[i] = input -> {
-            final Float val = 
DimensionHandlerUtils.convertObjectToFloat(input);
-            return val == null ? 0.f : val;
-          };
-          break;
-
-        case DOUBLE:
-          functions[i] = input -> {
-            Double val = DimensionHandlerUtils.convertObjectToDouble(input);
-            return val == null ? 0.0 : val;
-          };
-          break;
-        default:
-          throw new IAE("invalid type: [%s]", type);
-      }
+      final ValueType type = valueTypes.get(i) == null ? ValueType.STRING : 
valueTypes.get(i);
+      functions[i] = input -> 
DimensionHandlerUtils.convertObjectToTypeNonNull(input, type);
     }
     return functions;
   }
diff --git 
a/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java 
b/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java
index 47373721c18..77ba5defa0e 100644
--- 
a/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java
+++ 
b/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java
@@ -19,7 +19,6 @@
 
 package io.druid.query.topn;
 
-import com.google.common.base.Function;
 import io.druid.query.ColumnSelectorPlus;
 import io.druid.query.aggregation.Aggregator;
 import io.druid.query.topn.types.TopNColumnSelectorStrategy;
@@ -110,14 +109,8 @@ protected void updateResults(
   )
   {
     final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = 
params.getSelectorPlus();
-    final boolean needsResultTypeConversion = 
needsResultTypeConversion(params);
-    final Function<Object, Object> valueTransformer = 
TopNMapFn.getValueTransformer(
-        query.getDimensionSpec().getOutputType()
-    );
-
     selectorPlus.getColumnSelectorStrategy().updateDimExtractionResults(
         aggregatesStore,
-        needsResultTypeConversion ? valueTransformer : null,
         resultBuilder
     );
   }
@@ -136,11 +129,4 @@ protected void closeAggregators(Map<Comparable, 
Aggregator[]> valueMap)
   public void cleanup(TopNParams params)
   {
   }
-
-  private boolean needsResultTypeConversion(TopNParams params)
-  {
-    ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = 
params.getSelectorPlus();
-    TopNColumnSelectorStrategy strategy = 
selectorPlus.getColumnSelectorStrategy();
-    return query.getDimensionSpec().getOutputType() != strategy.getValueType();
-  }
 }
diff --git a/processing/src/main/java/io/druid/query/topn/DimValHolder.java 
b/processing/src/main/java/io/druid/query/topn/DimValHolder.java
index f983d8960d8..cc96767923f 100644
--- a/processing/src/main/java/io/druid/query/topn/DimValHolder.java
+++ b/processing/src/main/java/io/druid/query/topn/DimValHolder.java
@@ -26,19 +26,19 @@
 public class DimValHolder
 {
   private final Object topNMetricVal;
-  private final Comparable dimName;
+  private final Comparable dimValue;
   private final Object dimValIndex;
   private final Map<String, Object> metricValues;
 
   public DimValHolder(
       Object topNMetricVal,
-      Comparable dimName,
+      Comparable dimValue,
       Object dimValIndex,
       Map<String, Object> metricValues
   )
   {
     this.topNMetricVal = topNMetricVal;
-    this.dimName = dimName;
+    this.dimValue = dimValue;
     this.dimValIndex = dimValIndex;
     this.metricValues = metricValues;
   }
@@ -48,9 +48,9 @@ public Object getTopNMetricVal()
     return topNMetricVal;
   }
 
-  public Comparable getDimName()
+  public Comparable getDimValue()
   {
-    return dimName;
+    return dimValue;
   }
 
   public Object getDimValIndex()
@@ -66,14 +66,14 @@ public Object getDimValIndex()
   public static class Builder
   {
     private Object topNMetricVal;
-    private Comparable dimName;
+    private Comparable dimValue;
     private Object dimValIndex;
     private Map<String, Object> metricValues;
 
     public Builder()
     {
       topNMetricVal = null;
-      dimName = null;
+      dimValue = null;
       dimValIndex = null;
       metricValues = null;
     }
@@ -84,9 +84,9 @@ public Builder withTopNMetricVal(Object topNMetricVal)
       return this;
     }
 
-    public Builder withDimName(Comparable dimName)
+    public Builder withDimValue(Comparable dimValue)
     {
-      this.dimName = dimName;
+      this.dimValue = dimValue;
       return this;
     }
 
@@ -104,7 +104,7 @@ public Builder withMetricValues(Map<String, Object> 
metricValues)
 
     public DimValHolder build()
     {
-      return new DimValHolder(topNMetricVal, dimName, dimValIndex, 
metricValues);
+      return new DimValHolder(topNMetricVal, dimValue, dimValIndex, 
metricValues);
     }
   }
 }
diff --git 
a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java 
b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java
index c191f4a6d7a..b1e2d92fa55 100644
--- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java
+++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java
@@ -20,7 +20,6 @@
 package io.druid.query.topn;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
 import io.druid.collections.NonBlockingPool;
 import io.druid.collections.ResourceHolder;
@@ -37,7 +36,6 @@
 import io.druid.segment.DimensionSelector;
 import io.druid.segment.FilteredOffset;
 import io.druid.segment.StorageAdapter;
-import io.druid.segment.column.ValueType;
 import io.druid.segment.data.IndexedInts;
 import io.druid.segment.data.Offset;
 import io.druid.segment.historical.HistoricalColumnSelector;
@@ -736,10 +734,6 @@ protected void updateResults(
     final int[] aggregatorSizes = params.getAggregatorSizes();
     final DimensionSelector dimSelector = params.getDimSelector();
 
-    final ValueType outType = query.getDimensionSpec().getOutputType();
-    final boolean needsResultConversion = outType != ValueType.STRING;
-    final Function<Object, Object> valueTransformer = 
TopNMapFn.getValueTransformer(outType);
-
     for (int i = 0; i < positions.length; i++) {
       int position = positions[i];
       if (position >= 0) {
@@ -749,14 +743,9 @@ protected void updateResults(
           position += aggregatorSizes[j];
         }
 
-        Object retVal = dimSelector.lookupName(i);
-        if (needsResultConversion) {
-          retVal = valueTransformer.apply(retVal);
-        }
-
-
+        // Output type must be STRING in order for PooledTopNAlgorithm to make 
sense; so no need to convert value.
         resultBuilder.addEntry(
-            (Comparable) retVal,
+            dimSelector.lookupName(i),
             i,
             vals
         );
@@ -854,18 +843,6 @@ public int getNumBytesPerRecord()
       private int numValuesPerPass;
       private TopNMetricSpecBuilder<int[]> arrayProvider;
 
-      public Builder()
-      {
-        selectorPlus = null;
-        cursor = null;
-        resultsBufHolder = null;
-        resultsBuf = null;
-        aggregatorSizes = null;
-        numBytesPerRecord = 0;
-        numValuesPerPass = 0;
-        arrayProvider = null;
-      }
-
       public Builder withSelectorPlus(ColumnSelectorPlus selectorPlus)
       {
         this.selectorPlus = selectorPlus;
diff --git 
a/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java
 
b/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java
index 4cfc39ea085..b402c597e2d 100644
--- 
a/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java
+++ 
b/processing/src/main/java/io/druid/query/topn/TopNLexicographicResultBuilder.java
@@ -19,7 +19,6 @@
 
 package io.druid.query.topn;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.druid.query.Result;
@@ -66,32 +65,22 @@ public TopNLexicographicResultBuilder(
     this.threshold = threshold;
     this.pQueue = new PriorityQueue<>(
         threshold + 1,
-        new Comparator<DimValHolder>()
-        {
-          @Override
-          public int compare(
-              DimValHolder o1,
-              DimValHolder o2
-          )
-          {
-            return comparator.compare(o2.getDimName(), o1.getDimName());
-          }
-        }
+        (o1, o2) -> comparator.compare(o2.getDimValue(), o1.getDimValue())
     );
   }
 
   @Override
   public TopNResultBuilder addEntry(
-      Comparable dimNameObj,
+      Comparable dimValueObj,
       Object dimValIndex,
       Object[] metricVals
   )
   {
-    final String dimName = Objects.toString(dimNameObj, null);
+    final String dimValue = Objects.toString(dimValueObj, null);
     final Map<String, Object> metricValues = 
Maps.newHashMapWithExpectedSize(metricVals.length + 1);
 
-    if (shouldAdd(dimName)) {
-      metricValues.put(dimSpec.getOutputName(), dimName);
+    if (shouldAdd(dimValue)) {
+      metricValues.put(dimSpec.getOutputName(), dimValueObj);
       final int extra = metricVals.length % LOOP_UNROLL_COUNT;
       switch (extra) {
         case 7:
@@ -126,7 +115,7 @@ public TopNResultBuilder addEntry(
         metricValues.put(aggFactoryNames[i + 7], metricVals[i + 7]);
       }
 
-      pQueue.add(new 
DimValHolder.Builder().withDimName(dimName).withMetricValues(metricValues).build());
+      pQueue.add(new 
DimValHolder.Builder().withDimValue(dimValue).withMetricValues(metricValues).build());
       if (pQueue.size() > threshold) {
         pQueue.poll();
       }
@@ -143,7 +132,7 @@ public TopNResultBuilder 
addEntry(DimensionAndMetricValueExtractor dimensionAndM
 
     if (shouldAdd(dimensionValue)) {
       pQueue.add(
-          new DimValHolder.Builder().withDimName(dimensionValue)
+          new DimValHolder.Builder().withDimValue(dimensionValue)
                                     
.withMetricValues(dimensionAndMetricValueExtractor.getBaseObject())
                                     .build()
       );
@@ -167,30 +156,11 @@ public TopNResultBuilder 
addEntry(DimensionAndMetricValueExtractor dimensionAndM
     final DimValHolder[] holderValueArray = pQueue.toArray(new 
DimValHolder[0]);
     Arrays.sort(
         holderValueArray,
-        new Comparator<DimValHolder>()
-        {
-          @Override
-          public int compare(DimValHolder o1, DimValHolder o2)
-          {
-            return comparator.compare(o1.getDimName(), o2.getDimName());
-          }
-        }
-
+        (o1, o2) -> comparator.compare(o1.getDimValue(), o2.getDimValue())
     );
-    return new Result(
-        timestamp, new TopNResultValue(
-        Lists.transform(
-            Arrays.asList(holderValueArray),
-            new Function<DimValHolder, Object>()
-            {
-              @Override
-              public Object apply(DimValHolder dimValHolder)
-              {
-                return dimValHolder.getMetricValues();
-              }
-            }
-        )
-    )
+    return new Result<>(
+        timestamp,
+        new TopNResultValue(Lists.transform(Arrays.asList(holderValueArray), 
DimValHolder::getMetricValues))
     );
   }
 
diff --git a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java 
b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java
index 24d05fa3777..7bdb086f96a 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java
@@ -19,54 +19,16 @@
 
 package io.druid.query.topn;
 
-import com.google.common.base.Function;
-import io.druid.java.util.common.IAE;
 import io.druid.query.ColumnSelectorPlus;
 import io.druid.query.Result;
 import io.druid.query.topn.types.TopNColumnSelectorStrategyFactory;
 import io.druid.segment.Cursor;
 import io.druid.segment.DimensionHandlerUtils;
-import io.druid.segment.column.ValueType;
 
 import javax.annotation.Nullable;
-import java.util.Objects;
 
 public class TopNMapFn
 {
-  public static Function<Object, Object> getValueTransformer(ValueType 
outputType)
-  {
-    switch (outputType) {
-      case STRING:
-        return STRING_TRANSFORMER;
-      case LONG:
-        return LONG_TRANSFORMER;
-      case FLOAT:
-        return FLOAT_TRANSFORMER;
-      case DOUBLE:
-        return DOUBLE_TRANSFORMER;
-      default:
-        throw new IAE("invalid type: %s", outputType);
-    }
-  }
-
-  private static Function<Object, Object> STRING_TRANSFORMER = input -> 
Objects.toString(input, null);
-
-  private static Function<Object, Object> LONG_TRANSFORMER = input -> {
-    final Long longVal = DimensionHandlerUtils.convertObjectToLong(input);
-    return longVal == null ? DimensionHandlerUtils.ZERO_LONG : longVal;
-  };
-
-  private static Function<Object, Object> FLOAT_TRANSFORMER = input -> {
-    final Float floatVal = DimensionHandlerUtils.convertObjectToFloat(input);
-    return floatVal == null ? DimensionHandlerUtils.ZERO_FLOAT : floatVal;
-  };
-  private static Function<Object, Object> DOUBLE_TRANSFORMER = input -> {
-    final Double doubleValue = 
DimensionHandlerUtils.convertObjectToDouble(input);
-    return doubleValue == null ? DimensionHandlerUtils.ZERO_DOUBLE : 
doubleValue;
-  };
-
-  private static final TopNColumnSelectorStrategyFactory STRATEGY_FACTORY = 
new TopNColumnSelectorStrategyFactory();
-
   private final TopNQuery query;
   private final TopNAlgorithm topNAlgorithm;
 
@@ -83,7 +45,7 @@ public TopNMapFn(
   public Result<TopNResultValue> apply(final Cursor cursor, final @Nullable 
TopNQueryMetrics queryMetrics)
   {
     final ColumnSelectorPlus selectorPlus = 
DimensionHandlerUtils.createColumnSelectorPlus(
-        STRATEGY_FACTORY,
+        new 
TopNColumnSelectorStrategyFactory(query.getDimensionSpec().getOutputType()),
         query.getDimensionSpec(),
         cursor.getColumnSelectorFactory()
     );
diff --git 
a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java 
b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java
index fd5aa3f3cb4..9fecef4cedb 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNNumericResultBuilder.java
@@ -19,7 +19,6 @@
 
 package io.druid.query.topn;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -48,9 +47,9 @@
   private final String metricName;
   private final List<PostAggregator> postAggs;
   private final PriorityQueue<DimValHolder> pQueue;
-  private final Comparator<DimValHolder> dimValComparator;
+  private final Comparator<DimValHolder> dimValHolderComparator;
   private final String[] aggFactoryNames;
-  private static final Comparator<Comparable> dimNameComparator = new 
Comparator<Comparable>()
+  private static final Comparator<Comparable> dimValueComparator = new 
Comparator<Comparable>()
   {
     @Override
     public int compare(Comparable o1, Comparable o2)
@@ -65,6 +64,7 @@ public int compare(Comparable o1, Comparable o2)
       } else if (null == o2) {
         retval = 1;
       } else {
+        //noinspection unchecked
         retval = o1.compareTo(o2);
       }
       return retval;
@@ -91,30 +91,26 @@ public TopNNumericResultBuilder(
     this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, 
this.metricName);
     this.threshold = threshold;
     this.metricComparator = comparator;
-    this.dimValComparator = new Comparator<DimValHolder>()
-    {
-      @Override
-      public int compare(DimValHolder d1, DimValHolder d2)
-      {
-        int retVal = metricComparator.compare(d1.getTopNMetricVal(), 
d2.getTopNMetricVal());
-
-        if (retVal == 0) {
-          retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName());
-        }
+    this.dimValHolderComparator = (d1, d2) -> {
+      //noinspection unchecked
+      int retVal = metricComparator.compare(d1.getTopNMetricVal(), 
d2.getTopNMetricVal());
 
-        return retVal;
+      if (retVal == 0) {
+        retVal = dimValueComparator.compare(d1.getDimValue(), 
d2.getDimValue());
       }
+
+      return retVal;
     };
 
     // The logic in addEntry first adds, then removes if needed. So it can at 
any point have up to threshold + 1 entries.
-    pQueue = new PriorityQueue<>(this.threshold + 1, this.dimValComparator);
+    pQueue = new PriorityQueue<>(this.threshold + 1, dimValHolderComparator);
   }
 
   private static final int LOOP_UNROLL_COUNT = 8;
 
   @Override
   public TopNNumericResultBuilder addEntry(
-      Comparable dimName,
+      Comparable dimValueObj,
       Object dimValIndex,
       Object[] metricVals
   )
@@ -126,7 +122,7 @@ public TopNNumericResultBuilder addEntry(
 
     final Map<String, Object> metricValues = 
Maps.newHashMapWithExpectedSize(metricVals.length + postAggs.size() + 1);
 
-    metricValues.put(dimSpec.getOutputName(), dimName);
+    metricValues.put(dimSpec.getOutputName(), dimValueObj);
 
     final int extra = metricVals.length % LOOP_UNROLL_COUNT;
 
@@ -173,7 +169,7 @@ public TopNNumericResultBuilder addEntry(
     if (shouldAdd(topNMetricVal)) {
       DimValHolder dimValHolder = new DimValHolder.Builder()
           .withTopNMetricVal(topNMetricVal)
-          .withDimName(dimName)
+          .withDimValue(dimValueObj)
           .withDimValIndex(dimValIndex)
           .withMetricValues(metricValues)
           .build();
@@ -202,7 +198,7 @@ public TopNResultBuilder 
addEntry(DimensionAndMetricValueExtractor dimensionAndM
     if (shouldAdd(dimValue)) {
       final DimValHolder valHolder = new DimValHolder.Builder()
           .withTopNMetricVal(dimValue)
-          .withDimName((Comparable) 
dimensionAndMetricValueExtractor.getDimensionValue(dimSpec.getOutputName()))
+          .withDimValue((Comparable) 
dimensionAndMetricValueExtractor.getDimensionValue(dimSpec.getOutputName()))
           .withMetricValues(dimensionAndMetricValueExtractor.getBaseObject())
           .build();
       pQueue.add(valHolder);
@@ -224,39 +220,24 @@ public TopNResultBuilder 
addEntry(DimensionAndMetricValueExtractor dimensionAndM
   {
     final DimValHolder[] holderValueArray = pQueue.toArray(new 
DimValHolder[0]);
     Arrays.sort(
-        holderValueArray, new Comparator<DimValHolder>()
-        {
-          @Override
-          public int compare(DimValHolder d1, DimValHolder d2)
-          {
-            // Values flipped compared to earlier
-            int retVal = metricComparator.compare(d2.getTopNMetricVal(), 
d1.getTopNMetricVal());
+        holderValueArray,
+        (d1, d2) -> {
+          // Metric values flipped compared to dimValueHolderComparator.
 
-            if (retVal == 0) {
-              retVal = dimNameComparator.compare(d1.getDimName(), 
d2.getDimName());
-            }
+          //noinspection unchecked
+          int retVal = metricComparator.compare(d2.getTopNMetricVal(), 
d1.getTopNMetricVal());
 
-            return retVal;
+          if (retVal == 0) {
+            retVal = dimValueComparator.compare(d1.getDimValue(), 
d2.getDimValue());
           }
+
+          return retVal;
         }
     );
     List<DimValHolder> holderValues = Arrays.asList(holderValueArray);
 
     // Pull out top aggregated values
-    final List<Map<String, Object>> values = Lists.transform(
-        holderValues,
-        new Function<DimValHolder, Map<String, Object>>()
-        {
-          @Override
-          public Map<String, Object> apply(DimValHolder valHolder)
-          {
-            return valHolder.getMetricValues();
-          }
-        }
-    );
-    return new Result<TopNResultValue>(
-        timestamp,
-        new TopNResultValue(values)
-    );
+    final List<Map<String, Object>> values = Lists.transform(holderValues, 
DimValHolder::getMetricValues);
+    return new Result<>(timestamp, new TopNResultValue(values));
   }
 }
diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java 
b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java
index bc573b9dea9..cf2468f138a 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java
@@ -142,6 +142,10 @@ private TopNMapFn getMapFn(
                                                && 
columnCapabilities.isDictionaryEncoded())) {
       // Use DimExtraction for non-Strings and for non-dictionary-encoded 
Strings.
       topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
+    } else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
+      // Use DimExtraction when the dimension output type is a non-String. 
(It's like an extractionFn: there can be
+      // a many-to-one mapping, since numeric types can't represent all 
possible values of other types.)
+      topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
     } else if (selector.isAggregateAllMetrics()) {
       topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
     } else if (selector.isAggregateTopNMetricFirst() || 
query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
diff --git 
a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java 
b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java
index 774543d3239..53bd2ba4f3f 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java
@@ -50,6 +50,7 @@
 import io.druid.query.cache.CacheKeyBuilder;
 import io.druid.query.dimension.DefaultDimensionSpec;
 import io.druid.query.dimension.DimensionSpec;
+import io.druid.segment.DimensionHandlerUtils;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -384,11 +385,6 @@ public Object apply(final Result<TopNResultValue> input)
             Iterator<Object> inputIter = results.iterator();
             DateTime timestamp = granularity.toDateTime(((Number) 
inputIter.next()).longValue());
 
-            // Need a value transformer to convert generic 
Jackson-deserialized type into the proper type.
-            final Function<Object, Object> dimValueTransformer = 
TopNMapFn.getValueTransformer(
-                query.getDimensionSpec().getOutputType()
-            );
-
             while (inputIter.hasNext()) {
               List<Object> result = (List<Object>) inputIter.next();
               Map<String, Object> vals = Maps.newLinkedHashMap();
@@ -396,7 +392,11 @@ public Object apply(final Result<TopNResultValue> input)
               Iterator<AggregatorFactory> aggIter = aggs.iterator();
               Iterator<Object> resultIter = result.iterator();
 
-              vals.put(query.getDimensionSpec().getOutputName(), 
dimValueTransformer.apply(resultIter.next()));
+              // Must convert generic Jackson-deserialized type into the 
proper type.
+              vals.put(
+                  query.getDimensionSpec().getOutputName(),
+                  DimensionHandlerUtils.convertObjectToType(resultIter.next(), 
query.getDimensionSpec().getOutputType())
+              );
 
               while (aggIter.hasNext() && resultIter.hasNext()) {
                 final AggregatorFactory factory = aggIter.next();
diff --git 
a/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java 
b/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java
index 5ccc8d4c3fa..4b2d6172c1d 100644
--- a/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java
+++ b/processing/src/main/java/io/druid/query/topn/TopNResultBuilder.java
@@ -28,7 +28,7 @@
 public interface TopNResultBuilder
 {
   TopNResultBuilder addEntry(
-      Comparable dimNameObj,
+      Comparable dimValueObj,
       Object dimValIndex,
       Object[] metricVals
   );
diff --git 
a/processing/src/main/java/io/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java
 
b/processing/src/main/java/io/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java
index 24084c1e3be..0ae56a7ff73 100644
--- 
a/processing/src/main/java/io/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java
+++ 
b/processing/src/main/java/io/druid/query/topn/types/NumericTopNColumnSelectorStrategy.java
@@ -19,7 +19,7 @@
 
 package io.druid.query.topn.types;
 
-import com.google.common.base.Function;
+import io.druid.java.util.common.IAE;
 import io.druid.query.aggregation.Aggregator;
 import io.druid.query.topn.BaseTopNAlgorithm;
 import io.druid.query.topn.TopNParams;
@@ -29,6 +29,7 @@
 import io.druid.segment.BaseFloatColumnValueSelector;
 import io.druid.segment.BaseLongColumnValueSelector;
 import io.druid.segment.Cursor;
+import io.druid.segment.DimensionHandlerUtils;
 import io.druid.segment.StorageAdapter;
 import io.druid.segment.column.ValueType;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
@@ -37,12 +38,32 @@
 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 
 import java.util.Map;
+import java.util.function.Function;
 
 public abstract class NumericTopNColumnSelectorStrategy<
     ValueSelectorType,
     DimExtractionAggregateStoreType extends Map<?, Aggregator[]>>
     implements TopNColumnSelectorStrategy<ValueSelectorType, 
DimExtractionAggregateStoreType>
 {
+  public static TopNColumnSelectorStrategy ofType(final ValueType 
selectorType, final ValueType dimensionType)
+  {
+    final Function<Object, Comparable<?>> converter = 
DimensionHandlerUtils.converterFromTypeToTypeNonNull(
+        selectorType,
+        dimensionType
+    );
+
+    switch (selectorType) {
+      case LONG:
+        return new OfLong(converter);
+      case FLOAT:
+        return new OfFloat(converter);
+      case DOUBLE:
+        return new OfDouble(converter);
+      default:
+        throw new IAE("No strategy for type[%s]", selectorType);
+    }
+  }
+
   @Override
   public int getCardinality(ValueSelectorType selector)
   {
@@ -132,7 +153,6 @@ static long longDimExtractionScanAndAggregate(
   @Override
   public void updateDimExtractionResults(
       final DimExtractionAggregateStoreType aggregatesStore,
-      final Function<Object, Object> valueTransformer,
       final TopNResultBuilder resultBuilder
   )
   {
@@ -144,11 +164,7 @@ public void updateDimExtractionResults(
           vals[i] = aggs[i].get();
         }
 
-        Comparable key = 
convertAggregatorStoreKeyToColumnValue(entry.getKey());
-        if (valueTransformer != null) {
-          key = (Comparable) valueTransformer.apply(key);
-        }
-
+        final Comparable key = 
convertAggregatorStoreKeyToColumnValue(entry.getKey());
         resultBuilder.addEntry(key, key, vals);
       }
     }
@@ -159,10 +175,11 @@ public void updateDimExtractionResults(
   static class OfFloat
       extends NumericTopNColumnSelectorStrategy<BaseFloatColumnValueSelector, 
Int2ObjectMap<Aggregator[]>>
   {
-    @Override
-    public ValueType getValueType()
+    private final Function<Object, Comparable<?>> converter;
+
+    OfFloat(final Function<Object, Comparable<?>> converter)
     {
-      return ValueType.FLOAT;
+      this.converter = converter;
     }
 
     @Override
@@ -174,7 +191,7 @@ public ValueType getValueType()
     @Override
     Comparable convertAggregatorStoreKeyToColumnValue(Object 
aggregatorStoreKey)
     {
-      return Float.intBitsToFloat((Integer) aggregatorStoreKey);
+      return converter.apply(Float.intBitsToFloat((Integer) 
aggregatorStoreKey));
     }
 
     @Override
@@ -193,10 +210,11 @@ public long dimExtractionScanAndAggregate(
   static class OfLong
       extends NumericTopNColumnSelectorStrategy<BaseLongColumnValueSelector, 
Long2ObjectMap<Aggregator[]>>
   {
-    @Override
-    public ValueType getValueType()
+    private final Function<Object, Comparable<?>> converter;
+
+    OfLong(final Function<Object, Comparable<?>> converter)
     {
-      return ValueType.LONG;
+      this.converter = converter;
     }
 
     @Override
@@ -208,7 +226,7 @@ public ValueType getValueType()
     @Override
     Comparable convertAggregatorStoreKeyToColumnValue(Object 
aggregatorStoreKey)
     {
-      return (Long) aggregatorStoreKey;
+      return converter.apply(aggregatorStoreKey);
     }
 
     @Override
@@ -227,10 +245,11 @@ public long dimExtractionScanAndAggregate(
   static class OfDouble
       extends NumericTopNColumnSelectorStrategy<BaseDoubleColumnValueSelector, 
Long2ObjectMap<Aggregator[]>>
   {
-    @Override
-    public ValueType getValueType()
+    private final Function<Object, Comparable<?>> converter;
+
+    OfDouble(final Function<Object, Comparable<?>> converter)
     {
-      return ValueType.DOUBLE;
+      this.converter = converter;
     }
 
     @Override
@@ -242,7 +261,7 @@ public ValueType getValueType()
     @Override
     Comparable convertAggregatorStoreKeyToColumnValue(Object 
aggregatorStoreKey)
     {
-      return Double.longBitsToDouble((Long) aggregatorStoreKey);
+      return converter.apply(Double.longBitsToDouble((Long) 
aggregatorStoreKey));
     }
 
     @Override
diff --git 
a/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java
 
b/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java
index 724c4f09780..17b72989597 100644
--- 
a/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java
+++ 
b/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java
@@ -19,34 +19,47 @@
 
 package io.druid.query.topn.types;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
 import io.druid.query.aggregation.Aggregator;
 import io.druid.query.topn.BaseTopNAlgorithm;
 import io.druid.query.topn.TopNParams;
 import io.druid.query.topn.TopNQuery;
 import io.druid.query.topn.TopNResultBuilder;
 import io.druid.segment.Cursor;
+import io.druid.segment.DimensionHandlerUtils;
 import io.druid.segment.DimensionSelector;
 import io.druid.segment.StorageAdapter;
 import io.druid.segment.column.ValueType;
 import io.druid.segment.data.IndexedInts;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Function;
 
 public class StringTopNColumnSelectorStrategy
-    implements TopNColumnSelectorStrategy<DimensionSelector, Map<String, 
Aggregator[]>>
+    implements TopNColumnSelectorStrategy<DimensionSelector, Map<Comparable, 
Aggregator[]>>
 {
-  @Override
-  public int getCardinality(DimensionSelector selector)
+  private final Function<Object, Comparable<?>> dimensionValueConverter;
+
+  public StringTopNColumnSelectorStrategy(final ValueType dimensionType)
   {
-    return selector.getValueCardinality();
+    // We can handle null strings, but not null numbers.
+    if (dimensionType == ValueType.STRING) {
+      this.dimensionValueConverter = 
DimensionHandlerUtils.converterFromTypeToType(
+          ValueType.STRING,
+          dimensionType
+      );
+    } else {
+      this.dimensionValueConverter = 
DimensionHandlerUtils.converterFromTypeToTypeNonNull(
+          ValueType.STRING,
+          dimensionType
+      );
+    }
   }
 
   @Override
-  public ValueType getValueType()
+  public int getCardinality(DimensionSelector selector)
   {
-    return ValueType.STRING;
+    return selector.getValueCardinality();
   }
 
   @Override
@@ -71,9 +84,9 @@ public ValueType getValueType()
   }
 
   @Override
-  public Map<String, Aggregator[]> makeDimExtractionAggregateStore()
+  public Map<Comparable, Aggregator[]> makeDimExtractionAggregateStore()
   {
-    return Maps.newHashMap();
+    return new HashMap<>();
   }
 
   @Override
@@ -82,7 +95,7 @@ public long dimExtractionScanAndAggregate(
       DimensionSelector selector,
       Cursor cursor,
       Aggregator[][] rowSelector,
-      Map<String, Aggregator[]> aggregatesStore
+      Map<Comparable, Aggregator[]> aggregatesStore
   )
   {
     if (selector.getValueCardinality() != 
DimensionSelector.CARDINALITY_UNKNOWN) {
@@ -94,12 +107,11 @@ public long dimExtractionScanAndAggregate(
 
   @Override
   public void updateDimExtractionResults(
-      final Map<String, Aggregator[]> aggregatesStore,
-      final Function<Object, Object> valueTransformer,
+      final Map<Comparable, Aggregator[]> aggregatesStore,
       final TopNResultBuilder resultBuilder
   )
   {
-    for (Map.Entry<String, Aggregator[]> entry : aggregatesStore.entrySet()) {
+    for (Map.Entry<Comparable, Aggregator[]> entry : 
aggregatesStore.entrySet()) {
       Aggregator[] aggs = entry.getValue();
       if (aggs != null) {
         Object[] vals = new Object[aggs.length];
@@ -107,16 +119,8 @@ public void updateDimExtractionResults(
           vals[i] = aggs[i].get();
         }
 
-        Comparable key = entry.getKey();
-        if (valueTransformer != null) {
-          key = (Comparable) valueTransformer.apply(key);
-        }
-
-        resultBuilder.addEntry(
-            key,
-            key,
-            vals
-        );
+        final Comparable key = dimensionValueConverter.apply(entry.getKey());
+        resultBuilder.addEntry(key, key, vals);
       }
     }
   }
@@ -126,7 +130,7 @@ private long 
dimExtractionScanAndAggregateWithCardinalityKnown(
       Cursor cursor,
       DimensionSelector selector,
       Aggregator[][] rowSelector,
-      Map<String, Aggregator[]> aggregatesStore
+      Map<Comparable, Aggregator[]> aggregatesStore
   )
   {
     long processedRows = 0;
@@ -136,7 +140,7 @@ private long 
dimExtractionScanAndAggregateWithCardinalityKnown(
         final int dimIndex = dimValues.get(i);
         Aggregator[] theAggregators = rowSelector[dimIndex];
         if (theAggregators == null) {
-          final String key = selector.lookupName(dimIndex);
+          final Comparable<?> key = 
dimensionValueConverter.apply(selector.lookupName(dimIndex));
           theAggregators = aggregatesStore.get(key);
           if (theAggregators == null) {
             theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, 
query.getAggregatorSpecs());
@@ -159,7 +163,7 @@ private long 
dimExtractionScanAndAggregateWithCardinalityUnknown(
       TopNQuery query,
       Cursor cursor,
       DimensionSelector selector,
-      Map<String, Aggregator[]> aggregatesStore
+      Map<Comparable, Aggregator[]> aggregatesStore
   )
   {
     long processedRows = 0;
@@ -167,7 +171,7 @@ private long 
dimExtractionScanAndAggregateWithCardinalityUnknown(
       final IndexedInts dimValues = selector.getRow();
       for (int i = 0; i < dimValues.size(); ++i) {
         final int dimIndex = dimValues.get(i);
-        final String key = selector.lookupName(dimIndex);
+        final Comparable<?> key = 
dimensionValueConverter.apply(selector.lookupName(dimIndex));
 
         Aggregator[] theAggregators = aggregatesStore.get(key);
         if (theAggregators == null) {
diff --git 
a/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java
 
b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java
index a8f5d327339..5b8a4116bea 100644
--- 
a/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java
+++ 
b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java
@@ -19,7 +19,6 @@
 
 package io.druid.query.topn.types;
 
-import com.google.common.base.Function;
 import io.druid.query.aggregation.Aggregator;
 import io.druid.query.dimension.ColumnSelectorStrategy;
 import io.druid.query.topn.TopNParams;
@@ -27,9 +26,7 @@
 import io.druid.query.topn.TopNResultBuilder;
 import io.druid.segment.Cursor;
 import io.druid.segment.StorageAdapter;
-import io.druid.segment.column.ValueType;
 
-import javax.annotation.Nullable;
 import java.util.Map;
 
 public interface TopNColumnSelectorStrategy<ValueSelectorType, 
DimExtractionAggregateStoreType extends Map>
@@ -39,8 +36,6 @@
 
   int getCardinality(ValueSelectorType selector);
 
-  ValueType getValueType();
-
   /**
    * Used by DimExtractionTopNAlgorithm.
    *
@@ -107,13 +102,11 @@ long dimExtractionScanAndAggregate(
    * Read entries from the aggregates store, adding the keys and associated 
values to the resultBuilder, applying the
    * valueTransformer to the keys if present
    *
-   * @param aggregatesStore  Map created by makeDimExtractionAggregateStore()
-   * @param valueTransformer Converts keys to different types, if null no 
conversion is needed
-   * @param resultBuilder    TopN result builder
+   * @param aggregatesStore Map created by makeDimExtractionAggregateStore()
+   * @param resultBuilder   TopN result builder
    */
   void updateDimExtractionResults(
       DimExtractionAggregateStoreType aggregatesStore,
-      @Nullable Function<Object, Object> valueTransformer,
       TopNResultBuilder resultBuilder
   );
 }
diff --git 
a/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java
 
b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java
index 1d252f6a575..7dd77bb194a 100644
--- 
a/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java
+++ 
b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategyFactory.java
@@ -19,6 +19,7 @@
 
 package io.druid.query.topn.types;
 
+import com.google.common.base.Preconditions;
 import io.druid.java.util.common.IAE;
 import io.druid.query.dimension.ColumnSelectorStrategyFactory;
 import io.druid.segment.ColumnValueSelector;
@@ -27,23 +28,39 @@
 
 public class TopNColumnSelectorStrategyFactory implements 
ColumnSelectorStrategyFactory<TopNColumnSelectorStrategy>
 {
+  private final ValueType dimensionType;
+
+  public TopNColumnSelectorStrategyFactory(final ValueType dimensionType)
+  {
+    this.dimensionType = Preconditions.checkNotNull(dimensionType, 
"dimensionType");
+  }
+
   @Override
   public TopNColumnSelectorStrategy makeColumnSelectorStrategy(
       ColumnCapabilities capabilities, ColumnValueSelector selector
   )
   {
-    ValueType type = capabilities.getType();
-    switch (type) {
+    final ValueType selectorType = capabilities.getType();
+
+    switch (selectorType) {
       case STRING:
-        return new StringTopNColumnSelectorStrategy();
+        // Return strategy that reads strings and outputs dimensionTypes.
+        return new StringTopNColumnSelectorStrategy(dimensionType);
       case LONG:
-        return new NumericTopNColumnSelectorStrategy.OfLong();
       case FLOAT:
-        return new NumericTopNColumnSelectorStrategy.OfFloat();
       case DOUBLE:
-        return new NumericTopNColumnSelectorStrategy.OfDouble();
+        if (ValueType.isNumeric(dimensionType)) {
+          // Return strategy that aggregates using the _output_ type, because 
this allows us to collapse values
+          // properly (numeric types cannot represent all values of other 
numeric types).
+          return NumericTopNColumnSelectorStrategy.ofType(dimensionType, 
dimensionType);
+        } else {
+          // Return strategy that aggregates using the _input_ type. Here we 
are assuming that the output type can
+          // represent all possible values of the input type. This will be 
true for STRING, which is the only
+          // non-numeric type currently supported.
+          return NumericTopNColumnSelectorStrategy.ofType(selectorType, 
dimensionType);
+        }
       default:
-        throw new IAE("Cannot create query type helper from invalid type 
[%s]", type);
+        throw new IAE("Cannot create query type helper from invalid type 
[%s]", selectorType);
     }
   }
 }
diff --git 
a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java 
b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java
index 5c846b68c60..375f27cd5aa 100644
--- a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java
+++ b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java
@@ -19,12 +19,14 @@
 
 package io.druid.segment;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Doubles;
 import com.google.common.primitives.Floats;
 import io.druid.common.guava.GuavaUtils;
 import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
 import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.guava.Comparators;
 import io.druid.java.util.common.parsers.ParseException;
 import io.druid.query.ColumnSelectorPlus;
 import io.druid.query.dimension.ColumnSelectorStrategy;
@@ -38,6 +40,7 @@
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
 public final class DimensionHandlerUtils
 {
@@ -132,9 +135,10 @@ public static DimensionHandler getHandlerFromCapabilities(
    * in a query engine. See GroupByStrategyFactory for a reference.
    *
    * @param <ColumnSelectorStrategyClass> The strategy type created by the 
provided strategy factory.
-   * @param strategyFactory A factory provided by query engines that generates 
type-handling strategies
-   * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus 
objects for
-   * @param columnSelectorFactory Used to create value selectors for columns.
+   * @param strategyFactory               A factory provided by query engines 
that generates type-handling strategies
+   * @param dimensionSpecs                The set of columns to generate 
ColumnSelectorPlus objects for
+   * @param columnSelectorFactory         Used to create value selectors for 
columns.
+   *
    * @return An array of ColumnSelectorPlus objects, in the order of the 
columns specified in dimensionSpecs
    */
   public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
@@ -237,6 +241,15 @@ private static ColumnCapabilities getEffectiveCapabilities(
     return strategyFactory.makeColumnSelectorStrategy(capabilities, selector);
   }
 
+  @Nullable
+  public static String convertObjectToString(@Nullable Object valObj)
+  {
+    if (valObj == null) {
+      return null;
+    }
+    return valObj.toString();
+  }
+
   @Nullable
   public static Long convertObjectToLong(@Nullable Object valObj)
   {
@@ -293,6 +306,114 @@ public static Float convertObjectToFloat(@Nullable Object 
valObj, boolean report
     }
   }
 
+  @Nullable
+  public static Comparable<?> convertObjectToType(
+      @Nullable final Object obj,
+      final ValueType type,
+      final boolean reportParseExceptions
+  )
+  {
+    Preconditions.checkNotNull(type, "type");
+
+    switch (type) {
+      case LONG:
+        return convertObjectToLong(obj, reportParseExceptions);
+      case FLOAT:
+        return convertObjectToFloat(obj, reportParseExceptions);
+      case DOUBLE:
+        return convertObjectToDouble(obj, reportParseExceptions);
+      case STRING:
+        return convertObjectToString(obj);
+      default:
+        throw new IAE("Type[%s] is not supported for dimensions!", type);
+    }
+  }
+
+  public static int compareObjectsAsType(
+      @Nullable final Object lhs,
+      @Nullable final Object rhs,
+      final ValueType type
+  )
+  {
+    //noinspection unchecked
+    return Comparators.<Comparable>naturalNullsFirst().compare(
+        convertObjectToType(lhs, type),
+        convertObjectToType(rhs, type)
+    );
+  }
+
+  @Nullable
+  public static Comparable<?> convertObjectToType(
+      @Nullable final Object obj,
+      final ValueType type
+  )
+  {
+    return convertObjectToType(obj, Preconditions.checkNotNull(type, "type"), 
false);
+  }
+
+  /**
+   * This function only exists in the backport of #6220 to 0.12.x. It won't 
return nulls.
+   */
+  public static Comparable<?> convertObjectToTypeNonNull(
+      @Nullable final Object obj,
+      final ValueType type
+  )
+  {
+    return nonNullify(convertObjectToType(obj, 
Preconditions.checkNotNull(type, "type"), false), type);
+  }
+
+  /**
+   * This function only exists in the backport of #6220 to 0.12.x. It won't 
return nulls.
+   */
+  private static Comparable<?> nonNullify(@Nullable final Comparable<?> obj, 
final ValueType type)
+  {
+    if (obj == null) {
+      switch (type) {
+        case LONG:
+          return 0L;
+        case DOUBLE:
+          return 0.0d;
+        case FLOAT:
+          return 0.0f;
+        case STRING:
+          return "";
+        default:
+          throw new IAE("Cannot handle type[%s]", type);
+      }
+    } else {
+      return obj;
+    }
+  }
+
+  public static Function<Object, Comparable<?>> converterFromTypeToType(
+      final ValueType fromType,
+      final ValueType toType
+  )
+  {
+    if (fromType == toType) {
+      //noinspection unchecked
+      return (Function) Function.identity();
+    } else {
+      return obj -> convertObjectToType(obj, toType);
+    }
+  }
+
+  /**
+   * This function only exists in the backport of #6220 to 0.12.x. It won't 
return nulls.
+   */
+  public static Function<Object, Comparable<?>> converterFromTypeToTypeNonNull(
+      final ValueType fromType,
+      final ValueType toType
+  )
+  {
+    if (fromType == toType) {
+      //noinspection unchecked
+      return obj -> nonNullify((Comparable<?>) obj, toType);
+    } else {
+      return obj -> convertObjectToTypeNonNull(obj, toType);
+    }
+  }
+
   @Nullable
   public static Double convertObjectToDouble(@Nullable Object valObj)
   {
diff --git 
a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java 
b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
index 4896910e78f..34c341e86e0 100644
--- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
+++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java
@@ -5132,6 +5132,138 @@ public void testFullOnTopNLongTimeColumn()
     assertExpectedResults(expectedResults, query);
   }
 
+  @Test
+  public void testSortOnDoubleAsLong()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.dataSource)
+        .granularity(QueryRunnerTestHelper.allGran)
+        .dimension(new DefaultDimensionSpec("index", "index_alias", 
ValueType.LONG))
+        .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+        .threshold(4)
+        .intervals(QueryRunnerTestHelper.fullOnInterval)
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 59L)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 67L)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 68L)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 69L)
+                        .build()
+                )
+            )
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testSortOnTimeAsLong()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.dataSource)
+        .granularity(QueryRunnerTestHelper.allGran)
+        .dimension(new DefaultDimensionSpec("__time", "__time_alias", 
ValueType.LONG))
+        .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+        .threshold(4)
+        .intervals(QueryRunnerTestHelper.fullOnInterval)
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.<String, Object>builder()
+                        .put("__time_alias", 
DateTimes.of("2011-01-12T00:00:00.000Z").getMillis())
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("__time_alias", 
DateTimes.of("2011-01-13T00:00:00.000Z").getMillis())
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("__time_alias", 
DateTimes.of("2011-01-14T00:00:00.000Z").getMillis())
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("__time_alias", 
DateTimes.of("2011-01-15T00:00:00.000Z").getMillis())
+                        .build()
+                )
+            )
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testSortOnStringAsDouble()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.dataSource)
+        .granularity(QueryRunnerTestHelper.allGran)
+        .dimension(new DefaultDimensionSpec("market", "alias", 
ValueType.DOUBLE))
+        .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+        .threshold(4)
+        .intervals(QueryRunnerTestHelper.fullOnInterval)
+        .build();
+
+    final Map<String, Object> nullAliasMap = new HashMap<>();
+    nullAliasMap.put("alias", 0.0d);
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(Collections.singletonList(nullAliasMap))
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
+  @Test
+  public void testSortOnDoubleAsDouble()
+  {
+    TopNQuery query = new TopNQueryBuilder()
+        .dataSource(QueryRunnerTestHelper.dataSource)
+        .granularity(QueryRunnerTestHelper.allGran)
+        .dimension(new DefaultDimensionSpec("index", "index_alias", 
ValueType.DOUBLE))
+        .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+        .threshold(4)
+        .intervals(QueryRunnerTestHelper.fullOnInterval)
+        .build();
+
+    List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            DateTimes.of("2011-01-12T00:00:00.000Z"),
+            new TopNResultValue(
+                Arrays.<Map<String, Object>>asList(
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 59.021022d)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 59.266595d)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 67.73117d)
+                        .build(),
+                    ImmutableMap.<String, Object>builder()
+                        .put("index_alias", 68.573162d)
+                        .build()
+                )
+            )
+        )
+    );
+    assertExpectedResults(expectedResults, query);
+  }
+
   @Test
   public void testFullOnTopNLongTimeColumnWithExFn()
   {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to