This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 02b2057  extract generic dictionary encoded column indexing and 
merging stuffs (#11829)
02b2057 is described below

commit 02b205737124f42f5cdebe7beec31dc66cfc9a8a
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Oct 22 17:31:22 2021 -0700

    extract generic dictionary encoded column indexing and merging stuffs 
(#11829)
    
    * extract generic dictionary encoded column indexing and merging stuffs to 
pave the path towards supporting other types of dictionary encoded columns
    
    * spotbugs and inspections fixes
    
    * friendlier
    
    * javadoc
    
    * better name
    
    * adjust
---
 .../segment/DictionaryEncodedColumnIndexer.java    | 261 +++++++++
 ...rV9.java => DictionaryEncodedColumnMerger.java} | 376 ++++++-------
 .../druid/segment/DictionaryMergingIterator.java   | 177 ++++++
 .../apache/druid/segment/DimensionDictionary.java  | 163 ++++++
 .../java/org/apache/druid/segment/IndexMerger.java | 207 -------
 .../druid/segment/SortedDimensionDictionary.java   |  73 +++
 .../druid/segment/StringDimensionIndexer.java      | 404 +------------
 .../druid/segment/StringDimensionMergerV9.java     | 626 ++-------------------
 .../apache/druid/segment/data/GenericIndexed.java  |   2 +-
 .../druid/segment/data/GenericIndexedWriter.java   |   4 +-
 .../segment/serde/ComplexColumnPartSerde.java      |   5 +-
 ...est.java => DictionaryMergingIteratorTest.java} |   9 +-
 .../apache/druid/segment/IndexMergerTestBase.java  |  28 -
 13 files changed, 919 insertions(+), 1416 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnIndexer.java
 
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnIndexer.java
new file mode 100644
index 0000000..17202f5
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnIndexer.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.data.CloseableIndexed;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.IndexedIterable;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexRowHolder;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+
+/**
+ * Basic structure for indexing dictionary encoded columns
+ */
+public abstract class DictionaryEncodedColumnIndexer<KeyType, ActualType 
extends Comparable<ActualType>>
+    implements DimensionIndexer<Integer, KeyType, ActualType>
+{
+  protected final DimensionDictionary<ActualType> dimLookup;
+  protected volatile boolean isSparse = false;
+
+  @Nullable
+  protected SortedDimensionDictionary<ActualType> sortedLookup;
+
+  public DictionaryEncodedColumnIndexer()
+  {
+    this.dimLookup = new DimensionDictionary();
+  }
+
+  @Override
+  public void setSparseIndexed()
+  {
+    isSparse = true;
+  }
+
+  public int getSortedEncodedValueFromUnsorted(Integer 
unsortedIntermediateValue)
+  {
+    return sortedLookup().getSortedIdFromUnsortedId(unsortedIntermediateValue);
+  }
+
+  @Override
+  public Integer getUnsortedEncodedValueFromSorted(Integer 
sortedIntermediateValue)
+  {
+    return sortedLookup().getUnsortedIdFromSortedId(sortedIntermediateValue);
+  }
+
+  @Override
+  public CloseableIndexed<ActualType> getSortedIndexedValues()
+  {
+    return new CloseableIndexed<ActualType>()
+    {
+      @Override
+      public int size()
+      {
+        return getCardinality();
+      }
+
+      @Override
+      public ActualType get(int index)
+      {
+        return getActualValue(index, true);
+      }
+
+      @Override
+      public int indexOf(ActualType value)
+      {
+        int id = getEncodedValue(value, false);
+        return id < 0 ? DimensionDictionary.ABSENT_VALUE_ID : 
getSortedEncodedValueFromUnsorted(id);
+      }
+
+      @Override
+      public Iterator<ActualType> iterator()
+      {
+        return IndexedIterable.create(this).iterator();
+      }
+
+      @Override
+      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+      {
+        // nothing to inspect
+      }
+
+      @Override
+      public void close()
+      {
+        // nothing to close
+      }
+    };
+  }
+
+  @Override
+  public ActualType getMinValue()
+  {
+    return dimLookup.getMinValue();
+  }
+
+  @Override
+  public ActualType getMaxValue()
+  {
+    return dimLookup.getMaxValue();
+  }
+
+  @Override
+  public int getCardinality()
+  {
+    return dimLookup.size();
+  }
+
+  @Override
+  public ColumnValueSelector<?> makeColumnValueSelector(
+      IncrementalIndexRowHolder currEntry,
+      IncrementalIndex.DimensionDesc desc
+  )
+  {
+    return makeDimensionSelector(DefaultDimensionSpec.of(desc.getName()), 
currEntry, desc);
+  }
+
+  @Override
+  public ColumnValueSelector convertUnsortedValuesToSorted(ColumnValueSelector 
selectorWithUnsortedValues)
+  {
+    DimensionSelector dimSelectorWithUnsortedValues = (DimensionSelector) 
selectorWithUnsortedValues;
+    class SortedDimensionSelector implements DimensionSelector, IndexedInts
+    {
+      @Override
+      public int size()
+      {
+        return dimSelectorWithUnsortedValues.getRow().size();
+      }
+
+      @Override
+      public int get(int index)
+      {
+        return 
sortedLookup().getSortedIdFromUnsortedId(dimSelectorWithUnsortedValues.getRow().get(index));
+      }
+
+      @Override
+      public IndexedInts getRow()
+      {
+        return this;
+      }
+
+      @Override
+      public ValueMatcher makeValueMatcher(@Nullable String value)
+      {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public ValueMatcher makeValueMatcher(Predicate<String> predicate)
+      {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public int getValueCardinality()
+      {
+        return dimSelectorWithUnsortedValues.getValueCardinality();
+      }
+
+      @Nullable
+      @Override
+      public String lookupName(int id)
+      {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public boolean nameLookupPossibleInAdvance()
+      {
+        throw new UnsupportedOperationException();
+      }
+
+      @Nullable
+      @Override
+      public IdLookup idLookup()
+      {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+      {
+        inspector.visit("dimSelectorWithUnsortedValues", 
dimSelectorWithUnsortedValues);
+      }
+
+      @Nullable
+      @Override
+      public Object getObject()
+      {
+        return dimSelectorWithUnsortedValues.getObject();
+      }
+
+      @Override
+      public Class classOfObject()
+      {
+        return dimSelectorWithUnsortedValues.classOfObject();
+      }
+    }
+    return new SortedDimensionSelector();
+  }
+
+  /**
+   * returns true if all values are encoded in {@link #dimLookup}
+   */
+  protected boolean dictionaryEncodesAllValues()
+  {
+    // name lookup is possible in advance if we explicitly process a value for 
every row, or if we've encountered an
+    // actual null value and it is present in our dictionary. otherwise the 
dictionary will be missing ids for implicit
+    // null values
+    return !isSparse || dimLookup.getIdForNull() != 
DimensionDictionary.ABSENT_VALUE_ID;
+  }
+
+  protected SortedDimensionDictionary<ActualType> sortedLookup()
+  {
+    return sortedLookup == null ? sortedLookup = dimLookup.sort() : 
sortedLookup;
+  }
+
+  @Nullable
+  protected ActualType getActualValue(int intermediateValue, boolean idSorted)
+  {
+    if (idSorted) {
+      return sortedLookup().getValueFromSortedId(intermediateValue);
+    } else {
+      return dimLookup.getValue(intermediateValue);
+
+    }
+  }
+
+  protected int getEncodedValue(@Nullable ActualType fullValue, boolean 
idSorted)
+  {
+    int unsortedId = dimLookup.getId(fullValue);
+
+    if (idSorted) {
+      return sortedLookup().getSortedIdFromUnsortedId(unsortedId);
+    } else {
+      return unsortedId;
+    }
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
 
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
similarity index 79%
copy from 
processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
copy to 
processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
index abb4637..b78bfd9 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
@@ -20,45 +20,36 @@
 package org.apache.druid.segment;
 
 import com.google.common.base.Predicate;
-import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
+import com.google.common.collect.PeekingIterator;
 import it.unimi.dsi.fastutil.ints.IntIterable;
 import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
-import org.apache.druid.collections.spatial.ImmutableRTree;
-import org.apache.druid.collections.spatial.RTree;
-import org.apache.druid.collections.spatial.split.LinearGutmanSplitStrategy;
-import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.filter.ValueMatcher;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.column.ColumnCapabilities;
-import org.apache.druid.segment.column.ColumnDescriptor;
-import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.BitmapSerdeFactory;
 import org.apache.druid.segment.data.BitmapValues;
-import org.apache.druid.segment.data.ByteBufferWriter;
 import org.apache.druid.segment.data.CloseableIndexed;
 import org.apache.druid.segment.data.ColumnarIntsSerializer;
 import org.apache.druid.segment.data.ColumnarMultiIntsSerializer;
 import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
 import org.apache.druid.segment.data.CompressionStrategy;
-import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.data.GenericIndexedWriter;
-import org.apache.druid.segment.data.ImmutableRTreeObjectStrategy;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.IndexedInts;
-import org.apache.druid.segment.data.ListIndexed;
+import org.apache.druid.segment.data.ObjectStrategy;
 import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
 import 
org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
 import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
 import org.apache.druid.segment.data.VSizeColumnarMultiIntsSerializer;
-import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nonnull;
@@ -67,48 +58,46 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.IntBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
-public class StringDimensionMergerV9 implements DimensionMergerV9
+/**
+ * Base structure for merging dictionary encoded columns
+ */
+public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> 
implements DimensionMergerV9
 {
-  private static final Logger log = new Logger(StringDimensionMergerV9.class);
-
-  private static final Indexed<String> NULL_STR_DIM_VAL = new 
ListIndexed<>(Collections.singletonList(null));
-  private static final Splitter SPLITTER = Splitter.on(",");
+  private static final Logger log = new 
Logger(DictionaryEncodedColumnMerger.class);
 
-  private final String dimensionName;
-  private final ProgressIndicator progress;
-  private final Closer closer;
-  private final IndexSpec indexSpec;
-  private final SegmentWriteOutMedium segmentWriteOutMedium;
-  private final MutableBitmap nullRowsBitmap;
-  private final ColumnCapabilities capabilities;
+  protected final String dimensionName;
+  protected final ProgressIndicator progress;
+  protected final Closer closer;
+  protected final IndexSpec indexSpec;
+  protected final SegmentWriteOutMedium segmentWriteOutMedium;
+  protected final MutableBitmap nullRowsBitmap;
+  protected final ColumnCapabilities capabilities;
 
-  private int dictionarySize;
-  private int rowCount = 0;
-  private int cardinality = 0;
-  private boolean hasNull = false;
+  protected int dictionarySize;
+  protected int rowCount = 0;
+  protected int cardinality = 0;
+  protected boolean hasNull = false;
 
   @Nullable
-  private GenericIndexedWriter<ImmutableBitmap> bitmapWriter;
+  protected GenericIndexedWriter<ImmutableBitmap> bitmapWriter;
   @Nullable
-  private ByteBufferWriter<ImmutableRTree> spatialWriter;
+  protected ArrayList<IntBuffer> dimConversions;
   @Nullable
-  private ArrayList<IntBuffer> dimConversions;
+  protected List<IndexableAdapter> adapters;
   @Nullable
-  private List<IndexableAdapter> adapters;
+  protected DictionaryMergingIterator<T> dictionaryMergeIterator;
   @Nullable
-  private IndexMerger.DictionaryMergeIterator dictionaryMergeIterator;
+  protected ColumnarIntsSerializer encodedValueSerializer;
   @Nullable
-  private ColumnarIntsSerializer encodedValueSerializer;
+  protected GenericIndexedWriter<T> dictionaryWriter;
   @Nullable
-  private GenericIndexedWriter<String> dictionaryWriter;
-  @Nullable
-  private String firstDictionaryValue;
+  protected T firstDictionaryValue;
 
 
-  public StringDimensionMergerV9(
+  public DictionaryEncodedColumnMerger(
       String dimensionName,
       IndexSpec indexSpec,
       SegmentWriteOutMedium segmentWriteOutMedium,
@@ -121,12 +110,18 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
     this.indexSpec = indexSpec;
     this.capabilities = capabilities;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
-    nullRowsBitmap = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+    this.nullRowsBitmap = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
 
     this.progress = progress;
     this.closer = closer;
   }
 
+  protected abstract Comparator<Pair<Integer, PeekingIterator<T>>> 
getDictionaryMergingComparator();
+  protected abstract Indexed<T> getNullDimValue();
+  protected abstract ObjectStrategy<T> getObjectStrategy();
+  @Nullable
+  protected abstract T coerceValue(T value);
+
   @Override
   public void writeMergedValueDictionary(List<IndexableAdapter> adapters) 
throws IOException
   {
@@ -143,11 +138,11 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
     }
 
     int numMergeIndex = 0;
-    Indexed<String> dimValueLookup = null;
-    Indexed<String>[] dimValueLookups = new Indexed[adapters.size() + 1];
+    Indexed<T> dimValueLookup = null;
+    Indexed<T>[] dimValueLookups = new Indexed[adapters.size() + 1];
     for (int i = 0; i < adapters.size(); i++) {
       @SuppressWarnings("MustBeClosedChecker") // we register dimValues in the 
closer
-          Indexed<String> dimValues = 
closer.register(adapters.get(i).getDimValueLookup(dimensionName));
+      Indexed<T> dimValues = 
closer.register(adapters.get(i).getDimValueLookup(dimensionName));
       if (dimValues != null && !allNull(dimValues)) {
         dimHasValues = true;
         hasNull |= dimValues.indexOf(null) >= 0;
@@ -169,19 +164,23 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
      */
     if (convertMissingValues && !hasNull) {
       hasNull = true;
-      dimValueLookups[adapters.size()] = dimValueLookup = NULL_STR_DIM_VAL;
+      dimValueLookups[adapters.size()] = dimValueLookup = getNullDimValue();
       numMergeIndex++;
     }
 
     String dictFilename = StringUtils.format("%s.dim_values", dimensionName);
-    dictionaryWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, 
dictFilename, GenericIndexed.STRING_STRATEGY);
+    dictionaryWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, 
dictFilename, getObjectStrategy());
     firstDictionaryValue = null;
     dictionarySize = 0;
     dictionaryWriter.open();
 
     cardinality = 0;
     if (numMergeIndex > 1) {
-      dictionaryMergeIterator = new 
IndexMerger.DictionaryMergeIterator(dimValueLookups, true);
+      dictionaryMergeIterator = new DictionaryMergingIterator<>(
+          dimValueLookups,
+          getDictionaryMergingComparator(),
+          true
+      );
       writeDictionary(() -> dictionaryMergeIterator);
       for (int i = 0; i < adapters.size(); i++) {
         if (dimValueLookups[i] != null && 
dictionaryMergeIterator.needConversion(i)) {
@@ -204,51 +203,6 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
     setupEncodedValueWriter();
   }
 
-  private void writeDictionary(Iterable<String> dictionaryValues) throws 
IOException
-  {
-    for (String value : dictionaryValues) {
-      dictionaryWriter.write(value);
-      value = NullHandling.emptyToNullIfNeeded(value);
-      if (dictionarySize == 0) {
-        firstDictionaryValue = value;
-      }
-      dictionarySize++;
-    }
-  }
-
-  protected void setupEncodedValueWriter() throws IOException
-  {
-    final CompressionStrategy compressionStrategy = 
indexSpec.getDimensionCompression();
-
-    String filenameBase = StringUtils.format("%s.forward_dim", dimensionName);
-    if (capabilities.hasMultipleValues().isTrue()) {
-      if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
-        encodedValueSerializer = 
V3CompressedVSizeColumnarMultiIntsSerializer.create(
-            dimensionName,
-            segmentWriteOutMedium,
-            filenameBase,
-            cardinality,
-            compressionStrategy
-        );
-      } else {
-        encodedValueSerializer =
-            new VSizeColumnarMultiIntsSerializer(dimensionName, 
segmentWriteOutMedium, cardinality);
-      }
-    } else {
-      if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
-        encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
-            dimensionName,
-            segmentWriteOutMedium,
-            filenameBase,
-            cardinality,
-            compressionStrategy
-        );
-      } else {
-        encodedValueSerializer = new 
VSizeColumnarIntsSerializer(segmentWriteOutMedium, cardinality);
-      }
-    }
-    encodedValueSerializer.open();
-  }
 
   @Override
   public ColumnValueSelector convertSortedSegmentRowValuesToMergedRowValues(
@@ -371,30 +325,6 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
     rowCount++;
   }
 
-  private static IndexedInts getRow(ColumnValueSelector s)
-  {
-    if (s instanceof DimensionSelector) {
-      return ((DimensionSelector) s).getRow();
-    } else if (s instanceof NilColumnValueSelector) {
-      return IndexedInts.empty();
-    } else {
-      throw new ISE(
-          "ColumnValueSelector[%s], only DimensionSelector or 
NilColumnValueSelector is supported",
-          s.getClass()
-      );
-    }
-  }
-
-  private static boolean isNullRow(IndexedInts row, int size)
-  {
-    for (int i = 0; i < size; i++) {
-      if (row.get(i) != 0) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   @Override
   public void writeIndexes(@Nullable List<IntBuffer> segmentRowNumConversions) 
throws IOException
   {
@@ -416,15 +346,10 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
 
     BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
 
-    RTree tree = null;
-    boolean hasSpatial = capabilities.hasSpatialIndexes();
-    if (hasSpatial) {
-      spatialWriter = new ByteBufferWriter<>(
-          segmentWriteOutMedium,
-          new ImmutableRTreeObjectStrategy(bitmapFactory)
-      );
-      spatialWriter.open();
-      tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), 
bitmapFactory);
+    ExtendedIndexesMerger extendedIndexesMerger = getExtendedIndexesMerger();
+
+    if (extendedIndexesMerger != null) {
+      extendedIndexesMerger.initialize();
     }
 
     IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, 
dimensionName);
@@ -432,18 +357,19 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
     //Iterate all dim values's dictionary id in ascending order which in line 
with dim values's compare result.
     for (int dictId = 0; dictId < dictionarySize; dictId++) {
       progress.progress();
-      mergeBitmaps(
+      MutableBitmap mergedIndexes = mergeBitmaps(
           segmentRowNumConversions,
           bitmapFactory,
-          tree,
-          hasSpatial,
           dictIdSeeker,
           dictId
       );
+      if (extendedIndexesMerger != null) {
+        extendedIndexesMerger.mergeIndexes(dictId, mergedIndexes);
+      }
     }
 
-    if (hasSpatial) {
-      spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
+    if (extendedIndexesMerger != null) {
+      extendedIndexesMerger.write();
     }
 
     log.debug(
@@ -458,11 +384,63 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
     }
   }
 
-  void mergeBitmaps(
+
+
+  @Nullable
+  protected ExtendedIndexesMerger getExtendedIndexesMerger()
+  {
+    return null;
+  }
+
+  protected void setupEncodedValueWriter() throws IOException
+  {
+    final CompressionStrategy compressionStrategy = 
indexSpec.getDimensionCompression();
+
+    String filenameBase = StringUtils.format("%s.forward_dim", dimensionName);
+    if (capabilities.hasMultipleValues().isTrue()) {
+      if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
+        encodedValueSerializer = 
V3CompressedVSizeColumnarMultiIntsSerializer.create(
+            dimensionName,
+            segmentWriteOutMedium,
+            filenameBase,
+            cardinality,
+            compressionStrategy
+        );
+      } else {
+        encodedValueSerializer =
+            new VSizeColumnarMultiIntsSerializer(dimensionName, 
segmentWriteOutMedium, cardinality);
+      }
+    } else {
+      if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
+        encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
+            dimensionName,
+            segmentWriteOutMedium,
+            filenameBase,
+            cardinality,
+            compressionStrategy
+        );
+      } else {
+        encodedValueSerializer = new 
VSizeColumnarIntsSerializer(segmentWriteOutMedium, cardinality);
+      }
+    }
+    encodedValueSerializer.open();
+  }
+
+  protected void writeDictionary(Iterable<T> dictionaryValues) throws 
IOException
+  {
+    for (T value : dictionaryValues) {
+      dictionaryWriter.write(value);
+      value = coerceValue(value);
+      if (dictionarySize == 0) {
+        firstDictionaryValue = value;
+      }
+      dictionarySize++;
+    }
+  }
+
+  protected MutableBitmap mergeBitmaps(
       @Nullable List<IntBuffer> segmentRowNumConversions,
       BitmapFactory bmpFactory,
-      RTree tree,
-      boolean hasSpatial,
       IndexSeeker[] dictIdSeeker,
       int dictId
   ) throws IOException
@@ -510,17 +488,7 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
 
     bitmapWriter.write(bmpFactory.makeImmutableBitmap(mergedIndexes));
 
-    if (hasSpatial) {
-      String dimVal = dictionaryWriter.get(dictId);
-      if (dimVal != null) {
-        List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
-        float[] coords = new float[stringCoords.size()];
-        for (int j = 0; j < coords.length; j++) {
-          coords[j] = Float.valueOf(stringCoords.get(j));
-        }
-        tree.insert(coords, mergedIndexes);
-      }
-    }
+    return mergedIndexes;
   }
 
   @Override
@@ -529,33 +497,62 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
     return cardinality == 0;
   }
 
-  @Override
-  public ColumnDescriptor makeColumnDescriptor()
+
+  protected IndexSeeker[] toIndexSeekers(
+      List<IndexableAdapter> adapters,
+      ArrayList<IntBuffer> dimConversions,
+      String dimension
+  )
   {
-    // Now write everything
-    boolean hasMultiValue = capabilities.hasMultipleValues().isTrue();
-    final CompressionStrategy compressionStrategy = 
indexSpec.getDimensionCompression();
-    final BitmapSerdeFactory bitmapSerdeFactory = 
indexSpec.getBitmapSerdeFactory();
+    IndexSeeker[] seekers = new IndexSeeker[adapters.size()];
+    for (int i = 0; i < adapters.size(); i++) {
+      IntBuffer dimConversion = dimConversions.get(i);
+      if (dimConversion != null) {
+        seekers[i] = new IndexSeekerWithConversion((IntBuffer) 
dimConversion.asReadOnlyBuffer().rewind());
+      } else {
+        try (CloseableIndexed<String> dimValueLookup = 
adapters.get(i).getDimValueLookup(dimension)) {
+          seekers[i] = new IndexSeekerWithoutConversion(dimValueLookup == null 
? 0 : dimValueLookup.size());
+        }
+        catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+    }
+    return seekers;
+  }
 
-    final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
-    builder.setValueType(ValueType.STRING);
-    builder.setHasMultipleValues(hasMultiValue);
-    final DictionaryEncodedColumnPartSerde.SerializerBuilder partBuilder = 
DictionaryEncodedColumnPartSerde
-        .serializerBuilder()
-        .withDictionary(dictionaryWriter)
-        .withValue(
-            encodedValueSerializer,
-            hasMultiValue,
-            compressionStrategy != CompressionStrategy.UNCOMPRESSED
-        )
-        .withBitmapSerdeFactory(bitmapSerdeFactory)
-        .withBitmapIndex(bitmapWriter)
-        .withSpatialIndex(spatialWriter)
-        .withByteOrder(IndexIO.BYTE_ORDER);
-
-    return builder
-        .addSerde(partBuilder.build())
-        .build();
+  private boolean allNull(Indexed<T> dimValues)
+  {
+    for (int i = 0, size = dimValues.size(); i < size; i++) {
+      if (dimValues.get(i) != null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static IndexedInts getRow(ColumnValueSelector s)
+  {
+    if (s instanceof DimensionSelector) {
+      return ((DimensionSelector) s).getRow();
+    } else if (s instanceof NilColumnValueSelector) {
+      return IndexedInts.empty();
+    } else {
+      throw new ISE(
+          "ColumnValueSelector[%s], only DimensionSelector or 
NilColumnValueSelector is supported",
+          s.getClass()
+      );
+    }
+  }
+
+  private static boolean isNullRow(IndexedInts row, int size)
+  {
+    for (int i = 0; i < size; i++) {
+      if (row.get(i) != 0) {
+        return false;
+      }
+    }
+    return true;
   }
 
   protected interface IndexSeeker
@@ -677,36 +674,21 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
     }
   }
 
-  protected IndexSeeker[] toIndexSeekers(
-      List<IndexableAdapter> adapters,
-      ArrayList<IntBuffer> dimConversions,
-      String dimension
-  )
+  /**
+   * Specifies any additional per value indexes which should be constructed 
when
+   * {@link DictionaryEncodedColumnMerger#writeIndexes(List)} is called, on 
top of the standard bitmap index created
+   * with {@link DictionaryEncodedColumnMerger#mergeBitmaps}
+   */
+  interface ExtendedIndexesMerger
   {
-    IndexSeeker[] seekers = new IndexSeeker[adapters.size()];
-    for (int i = 0; i < adapters.size(); i++) {
-      IntBuffer dimConversion = dimConversions.get(i);
-      if (dimConversion != null) {
-        seekers[i] = new IndexSeekerWithConversion((IntBuffer) 
dimConversion.asReadOnlyBuffer().rewind());
-      } else {
-        try (CloseableIndexed<String> dimValueLookup = 
adapters.get(i).getDimValueLookup(dimension)) {
-          seekers[i] = new IndexSeekerWithoutConversion(dimValueLookup == null 
? 0 : dimValueLookup.size());
-        }
-        catch (IOException e) {
-          throw new UncheckedIOException(e);
-        }
-      }
-    }
-    return seekers;
-  }
+    void initialize() throws IOException;
 
-  private boolean allNull(Indexed<String> dimValues)
-  {
-    for (int i = 0, size = dimValues.size(); i < size; i++) {
-      if (dimValues.get(i) != null) {
-        return false;
-      }
-    }
-    return true;
+    /**
+     * Merge extended indexes for the given dictionaryId value. The merged 
bitmap index from
+     * {@link DictionaryEncodedColumnMerger#mergeBitmaps} is supplied should 
it be useful for the construction of
+     * the extended indexes.
+     */
+    void mergeIndexes(int dictId, MutableBitmap mergedIndexes) throws 
IOException;
+    void write() throws IOException;
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/DictionaryMergingIterator.java
 
b/processing/src/main/java/org/apache/druid/segment/DictionaryMergingIterator.java
new file mode 100644
index 0000000..56950d8
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/DictionaryMergingIterator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.data.Indexed;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator for merging dictionaries for some comparable type into a single 
sorted dictionary, useful when merging
+ * dictionary encoded columns
+ */
+public class DictionaryMergingIterator<T extends Comparable<T>> implements 
CloseableIterator<T>
+{
+  private static final Logger log = new 
Logger(DictionaryMergingIterator.class);
+
+  public static <T extends Comparable<T>> Comparator<Pair<Integer, 
PeekingIterator<T>>> makePeekingComparator()
+  {
+    return (lhs, rhs) -> {
+      T left = lhs.rhs.peek();
+      T right = rhs.rhs.peek();
+      if (left == null) {
+        //noinspection VariableNotUsedInsideIf
+        return right == null ? 0 : -1;
+      } else if (right == null) {
+        return 1;
+      } else {
+        return left.compareTo(right);
+      }
+    };
+  }
+
+  protected final IntBuffer[] conversions;
+  protected final List<Pair<ByteBuffer, Integer>> directBufferAllocations = 
new ArrayList<>();
+  protected final PriorityQueue<Pair<Integer, PeekingIterator<T>>> pQueue;
+
+  protected int counter;
+
+  public DictionaryMergingIterator(
+      Indexed<T>[] dimValueLookups,
+      Comparator<Pair<Integer, PeekingIterator<T>>> comparator,
+      boolean useDirect
+  )
+  {
+    pQueue = new PriorityQueue<>(dimValueLookups.length, comparator);
+    conversions = new IntBuffer[dimValueLookups.length];
+
+    long mergeBufferTotalSize = 0;
+    for (int i = 0; i < conversions.length; i++) {
+      if (dimValueLookups[i] == null) {
+        continue;
+      }
+      Indexed<T> indexed = dimValueLookups[i];
+      if (useDirect) {
+        int allocationSize = indexed.size() * Integer.BYTES;
+        log.trace("Allocating dictionary merging direct buffer with 
size[%,d]", allocationSize);
+        mergeBufferTotalSize += allocationSize;
+        final ByteBuffer conversionDirectBuffer = 
ByteBuffer.allocateDirect(allocationSize);
+        conversions[i] = conversionDirectBuffer.asIntBuffer();
+        directBufferAllocations.add(new Pair<>(conversionDirectBuffer, 
allocationSize));
+      } else {
+        conversions[i] = IntBuffer.allocate(indexed.size());
+        mergeBufferTotalSize += indexed.size();
+      }
+
+      final PeekingIterator<T> iter = transformIndexedIterator(indexed);
+      if (iter.hasNext()) {
+        pQueue.add(Pair.of(i, iter));
+      }
+    }
+    log.debug("Allocated [%,d] bytes of dictionary merging direct buffers", 
mergeBufferTotalSize);
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !pQueue.isEmpty();
+  }
+
+  @Override
+  public T next()
+  {
+    Pair<Integer, PeekingIterator<T>> smallest = pQueue.remove();
+    if (smallest == null) {
+      throw new NoSuchElementException();
+    }
+    final T value = writeTranslate(smallest, counter);
+
+    while (!pQueue.isEmpty() && Objects.equals(value, 
pQueue.peek().rhs.peek())) {
+      writeTranslate(pQueue.remove(), counter);
+    }
+    counter++;
+
+    return value;
+  }
+
+  protected PeekingIterator<T> transformIndexedIterator(Indexed<T> indexed)
+  {
+    return Iterators.peekingIterator(
+        indexed.iterator()
+    );
+  }
+
+  protected boolean needConversion(int index)
+  {
+    IntBuffer readOnly = conversions[index].asReadOnlyBuffer();
+    readOnly.rewind();
+    int i = 0;
+    while (readOnly.hasRemaining()) {
+      if (i != readOnly.get()) {
+        return true;
+      }
+      i++;
+    }
+    return false;
+  }
+
+  protected T writeTranslate(Pair<Integer, PeekingIterator<T>> smallest, int 
counter)
+  {
+    final int index = smallest.lhs;
+    final T value = smallest.rhs.next();
+
+    conversions[index].put(counter);
+    if (smallest.rhs.hasNext()) {
+      pQueue.add(smallest);
+    }
+    return value;
+  }
+
+  @Override
+  public void remove()
+  {
+    throw new UnsupportedOperationException("remove");
+  }
+
+  @Override
+  public void close()
+  {
+    long mergeBufferTotalSize = 0;
+    for (Pair<ByteBuffer, Integer> bufferAllocation : directBufferAllocations) 
{
+      log.trace("Freeing dictionary merging direct buffer with size[%,d]", 
bufferAllocation.rhs);
+      mergeBufferTotalSize += bufferAllocation.rhs;
+      ByteBufferUtils.free(bufferAllocation.lhs);
+    }
+    log.debug("Freed [%,d] bytes of dictionary merging direct buffers", 
mergeBufferTotalSize);
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java 
b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
new file mode 100644
index 0000000..345cd7b
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Buildable dictionary for some comparable type. Values are unsorted, or 
rather sorted in the order which they are
+ * added. A {@link SortedDimensionDictionary} can be constructed with a 
mapping of ids from this dictionary to the
+ * sorted dictionary with the {@link #sort()} method.
+ *
+ * This dictionary is thread-safe.
+ */
+public class DimensionDictionary<T extends Comparable<T>>
+{
+  public static final int ABSENT_VALUE_ID = -1;
+
+  @Nullable
+  private T minValue = null;
+  @Nullable
+  private T maxValue = null;
+  private volatile int idForNull = ABSENT_VALUE_ID;
+
+  private final Object2IntMap<T> valueToId = new Object2IntOpenHashMap<>();
+
+  private final List<T> idToValue = new ArrayList<>();
+  private final ReentrantReadWriteLock lock;
+
+  public DimensionDictionary()
+  {
+    this.lock = new ReentrantReadWriteLock();
+    valueToId.defaultReturnValue(ABSENT_VALUE_ID);
+  }
+
+  public int getId(@Nullable T value)
+  {
+    lock.readLock().lock();
+    try {
+      if (value == null) {
+        return idForNull;
+      }
+      return valueToId.getInt(value);
+    }
+    finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Nullable
+  public T getValue(int id)
+  {
+    lock.readLock().lock();
+    try {
+      if (id == idForNull) {
+        return null;
+      }
+      return idToValue.get(id);
+    }
+    finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  public int size()
+  {
+    lock.readLock().lock();
+    try {
+      // using idToValue rather than valueToId because the valueToId doesn't 
account null value, if it is present.
+      return idToValue.size();
+    }
+    finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  public int add(@Nullable T originalValue)
+  {
+    lock.writeLock().lock();
+    try {
+      if (originalValue == null) {
+        if (idForNull == ABSENT_VALUE_ID) {
+          idForNull = idToValue.size();
+          idToValue.add(null);
+        }
+        return idForNull;
+      }
+      int prev = valueToId.getInt(originalValue);
+      if (prev >= 0) {
+        return prev;
+      }
+      final int index = idToValue.size();
+      valueToId.put(originalValue, index);
+      idToValue.add(originalValue);
+      minValue = minValue == null || minValue.compareTo(originalValue) > 0 ? 
originalValue : minValue;
+      maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ? 
originalValue : maxValue;
+      return index;
+    }
+    finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  public T getMinValue()
+  {
+    lock.readLock().lock();
+    try {
+      return minValue;
+    }
+    finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  public T getMaxValue()
+  {
+    lock.readLock().lock();
+    try {
+      return maxValue;
+    }
+    finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  public int getIdForNull()
+  {
+    return idForNull;
+  }
+
+  public SortedDimensionDictionary<T> sort()
+  {
+    lock.readLock().lock();
+    try {
+      return new SortedDimensionDictionary<T>(idToValue, idToValue.size());
+    }
+    finally {
+      lock.readLock().unlock();
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java 
b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java
index 3876bf1..991cbdd 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java
@@ -22,21 +22,13 @@ package org.apache.druid.segment;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.collect.PeekingIterator;
 import com.google.inject.ImplementedBy;
-import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.common.utils.SerializerUtils;
 import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.java.util.common.ByteBufferUtils;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.utils.CollectionUtils;
@@ -45,15 +37,9 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
@@ -292,70 +278,6 @@ public interface IndexMerger
       @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
   ) throws IOException;
 
-  interface IndexSeeker
-  {
-    int NOT_EXIST = -1;
-    int NOT_INIT = -1;
-
-    int seek(int dictId);
-  }
-
-  /**
-   * Get old dictId from new dictId, and only support access in order
-   */
-  class IndexSeekerWithConversion implements IndexSeeker
-  {
-    private final IntBuffer dimConversions;
-    private int currIndex;
-    private int currVal;
-    private int lastVal;
-
-    IndexSeekerWithConversion(IntBuffer dimConversions)
-    {
-      this.dimConversions = dimConversions;
-      this.currIndex = 0;
-      this.currVal = IndexSeeker.NOT_INIT;
-      this.lastVal = IndexSeeker.NOT_INIT;
-    }
-
-    @Override
-    public int seek(int dictId)
-    {
-      if (dimConversions == null) {
-        return IndexSeeker.NOT_EXIST;
-      }
-      if (lastVal != IndexSeeker.NOT_INIT) {
-        if (dictId <= lastVal) {
-          throw new ISE(
-              "Value dictId[%d] is less than the last value dictId[%d] I have, 
cannot be.",
-              dictId, lastVal
-          );
-        }
-        return IndexSeeker.NOT_EXIST;
-      }
-      if (currVal == IndexSeeker.NOT_INIT) {
-        currVal = dimConversions.get();
-      }
-      if (currVal == dictId) {
-        int ret = currIndex;
-        ++currIndex;
-        if (dimConversions.hasRemaining()) {
-          currVal = dimConversions.get();
-        } else {
-          lastVal = dictId;
-        }
-        return ret;
-      } else if (currVal < dictId) {
-        throw new ISE(
-            "Skipped currValue dictId[%d], currIndex[%d]; incoming value 
dictId[%d]",
-            currVal, currIndex, dictId
-        );
-      } else {
-        return IndexSeeker.NOT_EXIST;
-      }
-    }
-  }
-
   /**
    * This method applies {@link 
DimensionMerger#convertSortedSegmentRowValuesToMergedRowValues(int, 
ColumnValueSelector)} to
    * all dimension column selectors of the given sourceRowIterator, using the 
given index number.
@@ -423,133 +345,4 @@ public interface IndexMerger
       }
     };
   }
-
-  class DictionaryMergeIterator implements CloseableIterator<String>
-  {
-    /**
-     * Don't replace this lambda with {@link Comparator#comparing} or {@link 
Comparators#naturalNullsFirst()} because
-     * this comparator is hot, so we want to avoid extra indirection layers.
-     */
-    static final Comparator<Pair<Integer, PeekingIterator<String>>> 
NULLS_FIRST_PEEKING_COMPARATOR = (lhs, rhs) -> {
-      String left = lhs.rhs.peek();
-      String right = rhs.rhs.peek();
-      if (left == null) {
-        //noinspection VariableNotUsedInsideIf
-        return right == null ? 0 : -1;
-      } else if (right == null) {
-        return 1;
-      } else {
-        return left.compareTo(right);
-      }
-    };
-
-    protected final IntBuffer[] conversions;
-    protected final List<Pair<ByteBuffer, Integer>> directBufferAllocations = 
new ArrayList<>();
-    protected final PriorityQueue<Pair<Integer, PeekingIterator<String>>> 
pQueue;
-
-    protected int counter;
-
-    DictionaryMergeIterator(Indexed<String>[] dimValueLookups, boolean 
useDirect)
-    {
-      pQueue = new PriorityQueue<>(dimValueLookups.length, 
NULLS_FIRST_PEEKING_COMPARATOR);
-      conversions = new IntBuffer[dimValueLookups.length];
-
-      long mergeBufferTotalSize = 0;
-      for (int i = 0; i < conversions.length; i++) {
-        if (dimValueLookups[i] == null) {
-          continue;
-        }
-        Indexed<String> indexed = dimValueLookups[i];
-        if (useDirect) {
-          int allocationSize = indexed.size() * Integer.BYTES;
-          log.trace("Allocating dictionary merging direct buffer with 
size[%,d]", allocationSize);
-          mergeBufferTotalSize += allocationSize;
-          final ByteBuffer conversionDirectBuffer = 
ByteBuffer.allocateDirect(allocationSize);
-          conversions[i] = conversionDirectBuffer.asIntBuffer();
-          directBufferAllocations.add(new Pair<>(conversionDirectBuffer, 
allocationSize));
-        } else {
-          conversions[i] = IntBuffer.allocate(indexed.size());
-          mergeBufferTotalSize += indexed.size();
-        }
-
-        final PeekingIterator<String> iter = Iterators.peekingIterator(
-            Iterators.transform(
-                indexed.iterator(),
-                NullHandling::nullToEmptyIfNeeded
-            )
-        );
-        if (iter.hasNext()) {
-          pQueue.add(Pair.of(i, iter));
-        }
-      }
-      log.debug("Allocated [%,d] bytes of dictionary merging direct buffers", 
mergeBufferTotalSize);
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-      return !pQueue.isEmpty();
-    }
-
-    @Override
-    public String next()
-    {
-      Pair<Integer, PeekingIterator<String>> smallest = pQueue.remove();
-      if (smallest == null) {
-        throw new NoSuchElementException();
-      }
-      final String value = writeTranslate(smallest, counter);
-
-      while (!pQueue.isEmpty() && Objects.equals(value, 
pQueue.peek().rhs.peek())) {
-        writeTranslate(pQueue.remove(), counter);
-      }
-      counter++;
-
-      return value;
-    }
-
-    boolean needConversion(int index)
-    {
-      IntBuffer readOnly = conversions[index].asReadOnlyBuffer();
-      readOnly.rewind();
-      int i = 0;
-      while (readOnly.hasRemaining()) {
-        if (i != readOnly.get()) {
-          return true;
-        }
-        i++;
-      }
-      return false;
-    }
-
-    private String writeTranslate(Pair<Integer, PeekingIterator<String>> 
smallest, int counter)
-    {
-      final int index = smallest.lhs;
-      final String value = smallest.rhs.next();
-
-      conversions[index].put(counter);
-      if (smallest.rhs.hasNext()) {
-        pQueue.add(smallest);
-      }
-      return value;
-    }
-
-    @Override
-    public void remove()
-    {
-      throw new UnsupportedOperationException("remove");
-    }
-    
-    @Override
-    public void close()
-    {
-      long mergeBufferTotalSize = 0;
-      for (Pair<ByteBuffer, Integer> bufferAllocation : 
directBufferAllocations) {
-        log.trace("Freeing dictionary merging direct buffer with size[%,d]", 
bufferAllocation.rhs);
-        mergeBufferTotalSize += bufferAllocation.rhs;
-        ByteBufferUtils.free(bufferAllocation.lhs);
-      }
-      log.debug("Freed [%,d] bytes of dictionary merging direct buffers", 
mergeBufferTotalSize);
-    }
-  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/SortedDimensionDictionary.java
 
b/processing/src/main/java/org/apache/druid/segment/SortedDimensionDictionary.java
new file mode 100644
index 0000000..934f494
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/SortedDimensionDictionary.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Lists;
+import it.unimi.dsi.fastutil.ints.IntIterator;
+import it.unimi.dsi.fastutil.objects.Object2IntRBTreeMap;
+import it.unimi.dsi.fastutil.objects.Object2IntSortedMap;
+import org.apache.druid.java.util.common.guava.Comparators;
+
+import java.util.List;
+
+/**
+ * Creates a sorted dictionary given some existing dictionary, storing a 
mapping of both sorted id to unsorted id
+ * and unsorted id to sorted id
+ */
+public class SortedDimensionDictionary<T extends Comparable<T>>
+{
+  private final List<T> sortedVals;
+  private final int[] idToIndex;
+  private final int[] indexToId;
+
+  public SortedDimensionDictionary(List<T> idToValue, int length)
+  {
+    Object2IntSortedMap<T> sortedMap = new 
Object2IntRBTreeMap<>(Comparators.naturalNullsFirst());
+    for (int id = 0; id < length; id++) {
+      T value = idToValue.get(id);
+      sortedMap.put(value, id);
+    }
+    this.sortedVals = Lists.newArrayList(sortedMap.keySet());
+    this.idToIndex = new int[length];
+    this.indexToId = new int[length];
+    int index = 0;
+    for (IntIterator iterator = sortedMap.values().iterator(); 
iterator.hasNext(); ) {
+      int id = iterator.nextInt();
+      idToIndex[id] = index;
+      indexToId[index] = id;
+      index++;
+    }
+  }
+
+  public int getUnsortedIdFromSortedId(int index)
+  {
+    return indexToId[index];
+  }
+
+  public int getSortedIdFromUnsortedId(int id)
+  {
+    return idToIndex[id];
+  }
+
+  public T getValueFromSortedId(int index)
+  {
+    return sortedVals.get(index);
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
index f2b4647..108db6e 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
@@ -21,21 +21,14 @@ package org.apache.druid.segment;
 
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
-import com.google.common.collect.Lists;
 import com.google.common.primitives.Ints;
 import it.unimi.dsi.fastutil.ints.IntArrays;
-import it.unimi.dsi.fastutil.ints.IntIterator;
-import it.unimi.dsi.fastutil.objects.Object2IntMap;
-import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
-import it.unimi.dsi.fastutil.objects.Object2IntRBTreeMap;
-import it.unimi.dsi.fastutil.objects.Object2IntSortedMap;
 import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.guava.Comparators;
-import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.dimension.DimensionSpec;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.filter.ValueMatcher;
@@ -44,9 +37,7 @@ import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.data.ArrayBasedIndexedInts;
-import org.apache.druid.segment.data.CloseableIndexed;
 import org.apache.druid.segment.data.IndexedInts;
-import org.apache.druid.segment.data.IndexedIterable;
 import org.apache.druid.segment.filter.BooleanValueMatcher;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexRow;
@@ -54,199 +45,25 @@ import 
org.apache.druid.segment.incremental.IncrementalIndexRowHolder;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
 import javax.annotation.Nullable;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class StringDimensionIndexer implements DimensionIndexer<Integer, 
int[], String>
+public class StringDimensionIndexer extends 
DictionaryEncodedColumnIndexer<int[], String>
 {
-
   @Nullable
   private static String emptyToNullIfNeeded(@Nullable Object o)
   {
     return o != null ? NullHandling.emptyToNullIfNeeded(o.toString()) : null;
   }
 
-  private static final int ABSENT_VALUE_ID = -1;
-
-  private static class DimensionDictionary
-  {
-    @Nullable
-    private String minValue = null;
-    @Nullable
-    private String maxValue = null;
-    private volatile int idForNull = ABSENT_VALUE_ID;
-
-    private final Object2IntMap<String> valueToId = new 
Object2IntOpenHashMap<>();
-
-    private final List<String> idToValue = new ArrayList<>();
-    private final ReentrantReadWriteLock lock;
-
-    public DimensionDictionary()
-    {
-      this.lock = new ReentrantReadWriteLock();
-      valueToId.defaultReturnValue(ABSENT_VALUE_ID);
-    }
-
-    public int getId(@Nullable String value)
-    {
-      lock.readLock().lock();
-      try {
-        if (value == null) {
-          return idForNull;
-        }
-        return valueToId.getInt(value);
-      }
-      finally {
-        lock.readLock().unlock();
-      }
-    }
-
-    @Nullable
-    public String getValue(int id)
-    {
-      lock.readLock().lock();
-      try {
-        if (id == idForNull) {
-          return null;
-        }
-        return idToValue.get(id);
-      }
-      finally {
-        lock.readLock().unlock();
-      }
-    }
-
-    public int size()
-    {
-      lock.readLock().lock();
-      try {
-        // using idToValue rather than valueToId because the valueToId doesn't 
account null value, if it is present.
-        return idToValue.size();
-      }
-      finally {
-        lock.readLock().unlock();
-      }
-    }
-
-    public int add(@Nullable String originalValue)
-    {
-      lock.writeLock().lock();
-      try {
-        if (originalValue == null) {
-          if (idForNull == ABSENT_VALUE_ID) {
-            idForNull = idToValue.size();
-            idToValue.add(null);
-          }
-          return idForNull;
-        }
-        int prev = valueToId.getInt(originalValue);
-        if (prev >= 0) {
-          return prev;
-        }
-        final int index = idToValue.size();
-        valueToId.put(originalValue, index);
-        idToValue.add(originalValue);
-        minValue = minValue == null || minValue.compareTo(originalValue) > 0 ? 
originalValue : minValue;
-        maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ? 
originalValue : maxValue;
-        return index;
-      }
-      finally {
-        lock.writeLock().unlock();
-      }
-    }
-
-    public String getMinValue()
-    {
-      lock.readLock().lock();
-      try {
-        return minValue;
-      }
-      finally {
-        lock.readLock().unlock();
-      }
-    }
-
-    public String getMaxValue()
-    {
-      lock.readLock().lock();
-      try {
-        return maxValue;
-      }
-      finally {
-        lock.readLock().unlock();
-      }
-    }
-
-    public SortedDimensionDictionary sort()
-    {
-      lock.readLock().lock();
-      try {
-        return new SortedDimensionDictionary(idToValue, idToValue.size());
-      }
-      finally {
-        lock.readLock().unlock();
-      }
-    }
-  }
-
-  private static class SortedDimensionDictionary
-  {
-    private final List<String> sortedVals;
-    private final int[] idToIndex;
-    private final int[] indexToId;
-
-    public SortedDimensionDictionary(List<String> idToValue, int length)
-    {
-      Object2IntSortedMap<String> sortedMap = new 
Object2IntRBTreeMap<>(Comparators.naturalNullsFirst());
-      for (int id = 0; id < length; id++) {
-        String value = idToValue.get(id);
-        sortedMap.put(value, id);
-      }
-      this.sortedVals = Lists.newArrayList(sortedMap.keySet());
-      this.idToIndex = new int[length];
-      this.indexToId = new int[length];
-      int index = 0;
-      for (IntIterator iterator = sortedMap.values().iterator(); 
iterator.hasNext(); ) {
-        int id = iterator.nextInt();
-        idToIndex[id] = index;
-        indexToId[index] = id;
-        index++;
-      }
-    }
-
-    public int getUnsortedIdFromSortedId(int index)
-    {
-      return indexToId[index];
-    }
-
-    public int getSortedIdFromUnsortedId(int id)
-    {
-      return idToIndex[id];
-    }
-
-    public String getValueFromSortedId(int index)
-    {
-      return sortedVals.get(index);
-    }
-  }
-
-  private final DimensionDictionary dimLookup;
   private final MultiValueHandling multiValueHandling;
   private final boolean hasBitmapIndexes;
   private final boolean hasSpatialIndexes;
   private volatile boolean hasMultipleValues = false;
-  private volatile boolean isSparse = false;
-
-  @Nullable
-  private SortedDimensionDictionary sortedLookup;
 
   public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean 
hasBitmapIndexes, boolean hasSpatialIndexes)
   {
-    this.dimLookup = new DimensionDictionary();
     this.multiValueHandling = multiValueHandling == null ? 
MultiValueHandling.ofDefault() : multiValueHandling;
     this.hasBitmapIndexes = hasBitmapIndexes;
     this.hasSpatialIndexes = hasSpatialIndexes;
@@ -260,7 +77,7 @@ public class StringDimensionIndexer implements 
DimensionIndexer<Integer, int[],
 
     if (dimValues == null) {
       final int nullId = dimLookup.getId(null);
-      encodedDimensionValues = nullId == ABSENT_VALUE_ID ? new 
int[]{dimLookup.add(null)} : new int[]{nullId};
+      encodedDimensionValues = nullId == DimensionDictionary.ABSENT_VALUE_ID ? 
new int[]{dimLookup.add(null)} : new int[]{nullId};
     } else if (dimValues instanceof List) {
       List<Object> dimValuesList = (List<Object>) dimValues;
       if (dimValuesList.isEmpty()) {
@@ -309,12 +126,6 @@ public class StringDimensionIndexer implements 
DimensionIndexer<Integer, int[],
   }
 
   @Override
-  public void setSparseIndexed()
-  {
-    isSparse = true;
-  }
-
-  @Override
   public long estimateEncodedKeyComponentSize(int[] key)
   {
     // string length is being accounted for each time they are referenced, 
based on dimension handler interface,
@@ -336,91 +147,6 @@ public class StringDimensionIndexer implements 
DimensionIndexer<Integer, int[],
     return estimatedSize;
   }
 
-  public Integer getSortedEncodedValueFromUnsorted(Integer 
unsortedIntermediateValue)
-  {
-    return sortedLookup().getSortedIdFromUnsortedId(unsortedIntermediateValue);
-  }
-
-  @Override
-  public Integer getUnsortedEncodedValueFromSorted(Integer 
sortedIntermediateValue)
-  {
-    return sortedLookup().getUnsortedIdFromSortedId(sortedIntermediateValue);
-  }
-
-  @Override
-  public CloseableIndexed<String> getSortedIndexedValues()
-  {
-    return new CloseableIndexed<String>()
-    {
-
-      @Override
-      public int size()
-      {
-        return getCardinality();
-      }
-
-      @Override
-      public String get(int index)
-      {
-        return getActualValue(index, true);
-      }
-
-      @Override
-      public int indexOf(String value)
-      {
-        int id = getEncodedValue(value, false);
-        return id < 0 ? ABSENT_VALUE_ID : 
getSortedEncodedValueFromUnsorted(id);
-      }
-
-      @Override
-      public Iterator<String> iterator()
-      {
-        return IndexedIterable.create(this).iterator();
-      }
-
-      @Override
-      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-      {
-        // nothing to inspect
-      }
-
-      @Override
-      public void close()
-      {
-        // nothing to close
-      }
-    };
-  }
-
-  @Override
-  public String getMinValue()
-  {
-    return dimLookup.getMinValue();
-  }
-
-  @Override
-  public String getMaxValue()
-  {
-    return dimLookup.getMaxValue();
-  }
-
-  @Override
-  public int getCardinality()
-  {
-    return dimLookup.size();
-  }
-
-  /**
-   * returns true if all values are encoded in {@link #dimLookup}
-   */
-  private boolean dictionaryEncodesAllValues()
-  {
-    // name lookup is possible in advance if we explicitly process a value for 
every row, or if we've encountered an
-    // actual null value and it is present in our dictionary. otherwise the 
dictionary will be missing ids for implicit
-    // null values
-    return !isSparse || dimLookup.idForNull != ABSENT_VALUE_ID;
-  }
-
   @Override
   public int compareUnsortedEncodedKeyComponents(int[] lhs, int[] rhs)
   {
@@ -432,7 +158,7 @@ public class StringDimensionIndexer implements 
DimensionIndexer<Integer, int[],
       // if the values don't have the same length, check if we're comparing [] 
and [null], which are equivalent
       if (lhsLen + rhsLen == 1) {
         int[] longerVal = rhsLen > lhsLen ? rhs : lhs;
-        if (longerVal[0] == dimLookup.idForNull) {
+        if (longerVal[0] == dimLookup.getIdForNull()) {
           return 0;
         } else {
           //noinspection ArrayEquality -- longerVal is explicitly set to only 
lhs or rhs
@@ -506,7 +232,7 @@ public class StringDimensionIndexer implements 
DimensionIndexer<Integer, int[],
       capabilites.setDictionaryEncoded(true);
     }
 
-    if (isSparse || dimLookup.idForNull != ABSENT_VALUE_ID) {
+    if (isSparse || dimLookup.getIdForNull() != 
DimensionDictionary.ABSENT_VALUE_ID) {
       capabilites.setHasNulls(true);
     }
     return capabilites;
@@ -724,7 +450,7 @@ public class StringDimensionIndexer implements 
DimensionIndexer<Integer, int[],
         } else {
           // Can happen if a value was added to our dimLookup after this 
selector was created. Act like it
           // doesn't exist.
-          return ABSENT_VALUE_ID;
+          return DimensionDictionary.ABSENT_VALUE_ID;
         }
       }
 
@@ -762,15 +488,6 @@ public class StringDimensionIndexer implements 
DimensionIndexer<Integer, int[],
     return new IndexerDimensionSelector();
   }
 
-  @Override
-  public ColumnValueSelector<?> makeColumnValueSelector(
-      IncrementalIndexRowHolder currEntry,
-      IncrementalIndex.DimensionDesc desc
-  )
-  {
-    return makeDimensionSelector(DefaultDimensionSpec.of(desc.getName()), 
currEntry, desc);
-  }
-
 
   @Nullable
   @Override
@@ -792,90 +509,6 @@ public class StringDimensionIndexer implements 
DimensionIndexer<Integer, int[],
   }
 
   @Override
-  public ColumnValueSelector convertUnsortedValuesToSorted(ColumnValueSelector 
selectorWithUnsortedValues)
-  {
-    DimensionSelector dimSelectorWithUnsortedValues = (DimensionSelector) 
selectorWithUnsortedValues;
-    class SortedDimensionSelector implements DimensionSelector, IndexedInts
-    {
-      @Override
-      public int size()
-      {
-        return dimSelectorWithUnsortedValues.getRow().size();
-      }
-
-      @Override
-      public int get(int index)
-      {
-        return 
sortedLookup().getSortedIdFromUnsortedId(dimSelectorWithUnsortedValues.getRow().get(index));
-      }
-
-      @Override
-      public IndexedInts getRow()
-      {
-        return this;
-      }
-
-      @Override
-      public ValueMatcher makeValueMatcher(@Nullable String value)
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public ValueMatcher makeValueMatcher(Predicate<String> predicate)
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public int getValueCardinality()
-      {
-        return dimSelectorWithUnsortedValues.getValueCardinality();
-      }
-
-      @Nullable
-      @Override
-      public String lookupName(int id)
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public boolean nameLookupPossibleInAdvance()
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Nullable
-      @Override
-      public IdLookup idLookup()
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-      {
-        inspector.visit("dimSelectorWithUnsortedValues", 
dimSelectorWithUnsortedValues);
-      }
-
-      @Nullable
-      @Override
-      public Object getObject()
-      {
-        return dimSelectorWithUnsortedValues.getObject();
-      }
-
-      @Override
-      public Class classOfObject()
-      {
-        return dimSelectorWithUnsortedValues.classOfObject();
-      }
-    }
-    return new SortedDimensionSelector();
-  }
-
-  @Override
   public void fillBitmapsFromUnsortedEncodedKeyComponent(
       int[] key,
       int rowNum,
@@ -894,31 +527,4 @@ public class StringDimensionIndexer implements 
DimensionIndexer<Integer, int[],
       bitmapIndexes[dimValIdx].add(rowNum);
     }
   }
-
-  private SortedDimensionDictionary sortedLookup()
-  {
-    return sortedLookup == null ? sortedLookup = dimLookup.sort() : 
sortedLookup;
-  }
-
-  @Nullable
-  private String getActualValue(int intermediateValue, boolean idSorted)
-  {
-    if (idSorted) {
-      return sortedLookup().getValueFromSortedId(intermediateValue);
-    } else {
-      return dimLookup.getValue(intermediateValue);
-
-    }
-  }
-
-  private int getEncodedValue(String fullValue, boolean idSorted)
-  {
-    int unsortedId = dimLookup.getId(fullValue);
-
-    if (idSorted) {
-      return sortedLookup().getSortedIdFromUnsortedId(unsortedId);
-    } else {
-      return unsortedId;
-    }
-  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
index abb4637..e4e0fbd 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
@@ -19,94 +19,47 @@
 
 package org.apache.druid.segment;
 
-import com.google.common.base.Predicate;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
-import it.unimi.dsi.fastutil.ints.IntIterable;
-import it.unimi.dsi.fastutil.ints.IntIterator;
+import com.google.common.collect.PeekingIterator;
 import org.apache.druid.collections.bitmap.BitmapFactory;
-import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.collections.spatial.ImmutableRTree;
 import org.apache.druid.collections.spatial.RTree;
 import org.apache.druid.collections.spatial.split.LinearGutmanSplitStrategy;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.filter.ValueMatcher;
-import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnDescriptor;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.BitmapSerdeFactory;
-import org.apache.druid.segment.data.BitmapValues;
 import org.apache.druid.segment.data.ByteBufferWriter;
-import org.apache.druid.segment.data.CloseableIndexed;
-import org.apache.druid.segment.data.ColumnarIntsSerializer;
-import org.apache.druid.segment.data.ColumnarMultiIntsSerializer;
-import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
 import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.data.GenericIndexed;
-import org.apache.druid.segment.data.GenericIndexedWriter;
 import org.apache.druid.segment.data.ImmutableRTreeObjectStrategy;
 import org.apache.druid.segment.data.Indexed;
-import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.segment.data.ListIndexed;
-import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
-import 
org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
-import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
-import org.apache.druid.segment.data.VSizeColumnarMultiIntsSerializer;
+import org.apache.druid.segment.data.ObjectStrategy;
 import org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.IntBuffer;
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
-public class StringDimensionMergerV9 implements DimensionMergerV9
+public class StringDimensionMergerV9 extends 
DictionaryEncodedColumnMerger<String>
 {
-  private static final Logger log = new Logger(StringDimensionMergerV9.class);
-
   private static final Indexed<String> NULL_STR_DIM_VAL = new 
ListIndexed<>(Collections.singletonList(null));
   private static final Splitter SPLITTER = Splitter.on(",");
 
-  private final String dimensionName;
-  private final ProgressIndicator progress;
-  private final Closer closer;
-  private final IndexSpec indexSpec;
-  private final SegmentWriteOutMedium segmentWriteOutMedium;
-  private final MutableBitmap nullRowsBitmap;
-  private final ColumnCapabilities capabilities;
-
-  private int dictionarySize;
-  private int rowCount = 0;
-  private int cardinality = 0;
-  private boolean hasNull = false;
+  public static final Comparator<Pair<Integer, PeekingIterator<String>>> 
DICTIONARY_MERGING_COMPARATOR =
+      DictionaryMergingIterator.makePeekingComparator();
 
   @Nullable
-  private GenericIndexedWriter<ImmutableBitmap> bitmapWriter;
-  @Nullable
   private ByteBufferWriter<ImmutableRTree> spatialWriter;
-  @Nullable
-  private ArrayList<IntBuffer> dimConversions;
-  @Nullable
-  private List<IndexableAdapter> adapters;
-  @Nullable
-  private IndexMerger.DictionaryMergeIterator dictionaryMergeIterator;
-  @Nullable
-  private ColumnarIntsSerializer encodedValueSerializer;
-  @Nullable
-  private GenericIndexedWriter<String> dictionaryWriter;
-  @Nullable
-  private String firstDictionaryValue;
-
 
   public StringDimensionMergerV9(
       String dimensionName,
@@ -117,416 +70,38 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
       Closer closer
   )
   {
-    this.dimensionName = dimensionName;
-    this.indexSpec = indexSpec;
-    this.capabilities = capabilities;
-    this.segmentWriteOutMedium = segmentWriteOutMedium;
-    nullRowsBitmap = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-
-    this.progress = progress;
-    this.closer = closer;
+    super(dimensionName, indexSpec, segmentWriteOutMedium, capabilities, 
progress, closer);
   }
 
   @Override
-  public void writeMergedValueDictionary(List<IndexableAdapter> adapters) 
throws IOException
+  protected Comparator<Pair<Integer, PeekingIterator<String>>> 
getDictionaryMergingComparator()
   {
-    boolean dimHasValues = false;
-    boolean dimAbsentFromSomeIndex = false;
-
-    long dimStartTime = System.currentTimeMillis();
-
-    this.adapters = adapters;
-
-    dimConversions = Lists.newArrayListWithCapacity(adapters.size());
-    for (int i = 0; i < adapters.size(); ++i) {
-      dimConversions.add(null);
-    }
-
-    int numMergeIndex = 0;
-    Indexed<String> dimValueLookup = null;
-    Indexed<String>[] dimValueLookups = new Indexed[adapters.size() + 1];
-    for (int i = 0; i < adapters.size(); i++) {
-      @SuppressWarnings("MustBeClosedChecker") // we register dimValues in the 
closer
-          Indexed<String> dimValues = 
closer.register(adapters.get(i).getDimValueLookup(dimensionName));
-      if (dimValues != null && !allNull(dimValues)) {
-        dimHasValues = true;
-        hasNull |= dimValues.indexOf(null) >= 0;
-        dimValueLookups[i] = dimValueLookup = dimValues;
-        numMergeIndex++;
-      } else {
-        dimAbsentFromSomeIndex = true;
-      }
-    }
-
-    boolean convertMissingValues = dimHasValues && dimAbsentFromSomeIndex;
-
-    /*
-     * Ensure the empty str is always in the dictionary if the dimension was 
missing from one index but
-     * has non-null values in another index.
-     * This is done so that IndexMerger.toMergedIndexRowIterator() can convert 
null columns to empty strings
-     * later on, to allow rows from indexes without a particular dimension to 
merge correctly with
-     * rows from indexes with null/empty str values for that dimension.
-     */
-    if (convertMissingValues && !hasNull) {
-      hasNull = true;
-      dimValueLookups[adapters.size()] = dimValueLookup = NULL_STR_DIM_VAL;
-      numMergeIndex++;
-    }
-
-    String dictFilename = StringUtils.format("%s.dim_values", dimensionName);
-    dictionaryWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, 
dictFilename, GenericIndexed.STRING_STRATEGY);
-    firstDictionaryValue = null;
-    dictionarySize = 0;
-    dictionaryWriter.open();
-
-    cardinality = 0;
-    if (numMergeIndex > 1) {
-      dictionaryMergeIterator = new 
IndexMerger.DictionaryMergeIterator(dimValueLookups, true);
-      writeDictionary(() -> dictionaryMergeIterator);
-      for (int i = 0; i < adapters.size(); i++) {
-        if (dimValueLookups[i] != null && 
dictionaryMergeIterator.needConversion(i)) {
-          dimConversions.set(i, dictionaryMergeIterator.conversions[i]);
-        }
-      }
-      cardinality = dictionaryMergeIterator.counter;
-    } else if (numMergeIndex == 1) {
-      writeDictionary(dimValueLookup);
-      cardinality = dimValueLookup.size();
-    }
-
-    log.debug(
-        "Completed dim[%s] conversions with cardinality[%,d] in %,d millis.",
-        dimensionName,
-        cardinality,
-        System.currentTimeMillis() - dimStartTime
-    );
-
-    setupEncodedValueWriter();
-  }
-
-  private void writeDictionary(Iterable<String> dictionaryValues) throws 
IOException
-  {
-    for (String value : dictionaryValues) {
-      dictionaryWriter.write(value);
-      value = NullHandling.emptyToNullIfNeeded(value);
-      if (dictionarySize == 0) {
-        firstDictionaryValue = value;
-      }
-      dictionarySize++;
-    }
-  }
-
-  protected void setupEncodedValueWriter() throws IOException
-  {
-    final CompressionStrategy compressionStrategy = 
indexSpec.getDimensionCompression();
-
-    String filenameBase = StringUtils.format("%s.forward_dim", dimensionName);
-    if (capabilities.hasMultipleValues().isTrue()) {
-      if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
-        encodedValueSerializer = 
V3CompressedVSizeColumnarMultiIntsSerializer.create(
-            dimensionName,
-            segmentWriteOutMedium,
-            filenameBase,
-            cardinality,
-            compressionStrategy
-        );
-      } else {
-        encodedValueSerializer =
-            new VSizeColumnarMultiIntsSerializer(dimensionName, 
segmentWriteOutMedium, cardinality);
-      }
-    } else {
-      if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
-        encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
-            dimensionName,
-            segmentWriteOutMedium,
-            filenameBase,
-            cardinality,
-            compressionStrategy
-        );
-      } else {
-        encodedValueSerializer = new 
VSizeColumnarIntsSerializer(segmentWriteOutMedium, cardinality);
-      }
-    }
-    encodedValueSerializer.open();
+    return DICTIONARY_MERGING_COMPARATOR;
   }
 
   @Override
-  public ColumnValueSelector convertSortedSegmentRowValuesToMergedRowValues(
-      int segmentIndex,
-      ColumnValueSelector source
-  )
+  protected Indexed<String> getNullDimValue()
   {
-    IntBuffer converter = dimConversions.get(segmentIndex);
-    if (converter == null) {
-      return source;
-    }
-    DimensionSelector sourceDimensionSelector = (DimensionSelector) source;
-
-    IndexedInts convertedRow = new IndexedInts()
-    {
-      @Override
-      public int size()
-      {
-        return sourceDimensionSelector.getRow().size();
-      }
-
-      @Override
-      public int get(int index)
-      {
-        return converter.get(sourceDimensionSelector.getRow().get(index));
-      }
-
-      @Override
-      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-      {
-        inspector.visit("source", source);
-        inspector.visit("converter", converter);
-      }
-    };
-    return new DimensionSelector()
-    {
-      @Override
-      public IndexedInts getRow()
-      {
-        return convertedRow;
-      }
-
-      @Override
-      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-      {
-        inspector.visit("convertedRow", convertedRow);
-      }
-
-      @Override
-      public ValueMatcher makeValueMatcher(String value)
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public ValueMatcher makeValueMatcher(Predicate<String> predicate)
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public int getValueCardinality()
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Nullable
-      @Override
-      public String lookupName(int id)
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public boolean nameLookupPossibleInAdvance()
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Nullable
-      @Override
-      public IdLookup idLookup()
-      {
-        throw new UnsupportedOperationException();
-      }
-
-      @Nullable
-      @Override
-      public Object getObject()
-      {
-        return sourceDimensionSelector.getObject();
-      }
-
-      @Override
-      public Class classOfObject()
-      {
-        return sourceDimensionSelector.classOfObject();
-      }
-    };
+    return NULL_STR_DIM_VAL;
   }
 
   @Override
-  public void processMergedRow(ColumnValueSelector selector) throws IOException
-  {
-    IndexedInts row = getRow(selector);
-    int rowSize = row.size();
-    if (rowSize == 0) {
-      nullRowsBitmap.add(rowCount);
-    } else if (hasNull && isNullRow(row, rowSize)) {
-      // If this dimension has the null/empty str in its dictionary, a row 
with nulls at all positions should also be
-      // added to nullRowBitmap.
-      nullRowsBitmap.add(rowCount);
-    }
-    if (encodedValueSerializer instanceof ColumnarMultiIntsSerializer) {
-      ((ColumnarMultiIntsSerializer) encodedValueSerializer).addValues(row);
-    } else {
-      int value = row.size() == 0 ? 0 : row.get(0);
-      ((SingleValueColumnarIntsSerializer) 
encodedValueSerializer).addValue(value);
-    }
-    rowCount++;
-  }
-
-  private static IndexedInts getRow(ColumnValueSelector s)
-  {
-    if (s instanceof DimensionSelector) {
-      return ((DimensionSelector) s).getRow();
-    } else if (s instanceof NilColumnValueSelector) {
-      return IndexedInts.empty();
-    } else {
-      throw new ISE(
-          "ColumnValueSelector[%s], only DimensionSelector or 
NilColumnValueSelector is supported",
-          s.getClass()
-      );
-    }
-  }
-
-  private static boolean isNullRow(IndexedInts row, int size)
+  protected ObjectStrategy<String> getObjectStrategy()
   {
-    for (int i = 0; i < size; i++) {
-      if (row.get(i) != 0) {
-        return false;
-      }
-    }
-    return true;
+    return GenericIndexed.STRING_STRATEGY;
   }
 
   @Override
-  public void writeIndexes(@Nullable List<IntBuffer> segmentRowNumConversions) 
throws IOException
+  protected String coerceValue(String value)
   {
-    if (!capabilities.hasBitmapIndexes()) {
-      return;
-    }
-
-    long dimStartTime = System.currentTimeMillis();
-    final BitmapSerdeFactory bitmapSerdeFactory = 
indexSpec.getBitmapSerdeFactory();
-
-    String bmpFilename = StringUtils.format("%s.inverted", dimensionName);
-    bitmapWriter = new GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        bmpFilename,
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    bitmapWriter.open();
-    bitmapWriter.setObjectsNotSorted();
-
-    BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
-
-    RTree tree = null;
-    boolean hasSpatial = capabilities.hasSpatialIndexes();
-    if (hasSpatial) {
-      spatialWriter = new ByteBufferWriter<>(
-          segmentWriteOutMedium,
-          new ImmutableRTreeObjectStrategy(bitmapFactory)
-      );
-      spatialWriter.open();
-      tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), 
bitmapFactory);
-    }
-
-    IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, 
dimensionName);
-
-    //Iterate all dim values's dictionary id in ascending order which in line 
with dim values's compare result.
-    for (int dictId = 0; dictId < dictionarySize; dictId++) {
-      progress.progress();
-      mergeBitmaps(
-          segmentRowNumConversions,
-          bitmapFactory,
-          tree,
-          hasSpatial,
-          dictIdSeeker,
-          dictId
-      );
-    }
-
-    if (hasSpatial) {
-      spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
-    }
-
-    log.debug(
-        "Completed dim[%s] inverted with cardinality[%,d] in %,d millis.",
-        dimensionName,
-        dictionarySize,
-        System.currentTimeMillis() - dimStartTime
-    );
-
-    if (dictionaryMergeIterator != null) {
-      dictionaryMergeIterator.close();
-    }
-  }
-
-  void mergeBitmaps(
-      @Nullable List<IntBuffer> segmentRowNumConversions,
-      BitmapFactory bmpFactory,
-      RTree tree,
-      boolean hasSpatial,
-      IndexSeeker[] dictIdSeeker,
-      int dictId
-  ) throws IOException
-  {
-    List<IntIterable> convertedInvertedIndexesToMerge = 
Lists.newArrayListWithCapacity(adapters.size());
-    for (int j = 0; j < adapters.size(); ++j) {
-      int seekedDictId = dictIdSeeker[j].seek(dictId);
-      if (seekedDictId != IndexSeeker.NOT_EXIST) {
-        IntIterable values;
-        if (segmentRowNumConversions != null) {
-          values = new ConvertingBitmapValues(
-              adapters.get(j).getBitmapValues(dimensionName, seekedDictId),
-              segmentRowNumConversions.get(j)
-          );
-        } else {
-          BitmapValues bitmapValues = 
adapters.get(j).getBitmapValues(dimensionName, seekedDictId);
-          values = bitmapValues::iterator;
-        }
-        convertedInvertedIndexesToMerge.add(values);
-      }
-    }
-
-    MutableBitmap mergedIndexes = bmpFactory.makeEmptyMutableBitmap();
-    List<IntIterator> convertedInvertedIndexesIterators = new 
ArrayList<>(convertedInvertedIndexesToMerge.size());
-    for (IntIterable convertedInvertedIndexes : 
convertedInvertedIndexesToMerge) {
-      
convertedInvertedIndexesIterators.add(convertedInvertedIndexes.iterator());
-    }
-
-    // Merge ascending index iterators into a single one, remove duplicates, 
and add to the mergedIndexes bitmap.
-    // Merge is needed, because some compacting MutableBitmap implementations 
are very inefficient when bits are
-    // added not in the ascending order.
-    int prevRow = IndexMerger.INVALID_ROW;
-    for (IntIterator mergeIt = 
IntIteratorUtils.mergeAscending(convertedInvertedIndexesIterators);
-         mergeIt.hasNext(); ) {
-      int row = mergeIt.nextInt();
-      if (row != prevRow && row != IndexMerger.INVALID_ROW) {
-        mergedIndexes.add(row);
-      }
-      prevRow = row;
-    }
-
-    if (dictId == 0 && firstDictionaryValue == null) {
-      mergedIndexes.or(nullRowsBitmap);
-    }
-
-    bitmapWriter.write(bmpFactory.makeImmutableBitmap(mergedIndexes));
-
-    if (hasSpatial) {
-      String dimVal = dictionaryWriter.get(dictId);
-      if (dimVal != null) {
-        List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
-        float[] coords = new float[stringCoords.size()];
-        for (int j = 0; j < coords.length; j++) {
-          coords[j] = Float.valueOf(stringCoords.get(j));
-        }
-        tree.insert(coords, mergedIndexes);
-      }
-    }
+    return NullHandling.emptyToNullIfNeeded(value);
   }
 
+  @Nullable
   @Override
-  public boolean canSkip()
+  protected ExtendedIndexesMerger getExtendedIndexesMerger()
   {
-    return cardinality == 0;
+    return new SpatialIndexesMerger();
   }
 
   @Override
@@ -558,155 +133,54 @@ public class StringDimensionMergerV9 implements 
DimensionMergerV9
         .build();
   }
 
-  protected interface IndexSeeker
-  {
-    int NOT_EXIST = -1;
-    int NOT_INIT = -1;
-
-    int seek(int dictId);
-  }
-
-  protected static class IndexSeekerWithoutConversion implements IndexSeeker
-  {
-    private final int limit;
-
-    public IndexSeekerWithoutConversion(int limit)
-    {
-      this.limit = limit;
-    }
-
-    @Override
-    public int seek(int dictId)
-    {
-      return dictId < limit ? dictId : NOT_EXIST;
-    }
-  }
-
   /**
-   * Get old dictId from new dictId, and only support access in order
+   * Write spatial indexes for string columns that have them
    */
-  protected static class IndexSeekerWithConversion implements IndexSeeker
+  public class SpatialIndexesMerger implements ExtendedIndexesMerger
   {
-    private final IntBuffer dimConversions;
-    private int currIndex;
-    private int currVal;
-    private int lastVal;
-
-    IndexSeekerWithConversion(IntBuffer dimConversions)
-    {
-      this.dimConversions = dimConversions;
-      this.currIndex = 0;
-      this.currVal = NOT_INIT;
-      this.lastVal = NOT_INIT;
-    }
+    private RTree tree;
+    private final boolean hasSpatial = capabilities.hasSpatialIndexes();
 
     @Override
-    public int seek(int dictId)
+    public void initialize() throws IOException
     {
-      if (dimConversions == null) {
-        return NOT_EXIST;
-      }
-      if (lastVal != NOT_INIT) {
-        if (dictId <= lastVal) {
-          throw new ISE(
-              "Value dictId[%d] is less than the last value dictId[%d] I have, 
cannot be.",
-              dictId, lastVal
-          );
-        }
-        return NOT_EXIST;
-      }
-      if (currVal == NOT_INIT) {
-        currVal = dimConversions.get();
-      }
-      if (currVal == dictId) {
-        int ret = currIndex;
-        ++currIndex;
-        if (dimConversions.hasRemaining()) {
-          currVal = dimConversions.get();
-        } else {
-          lastVal = dictId;
-        }
-        return ret;
-      } else if (currVal < dictId) {
-        throw new ISE(
-            "Skipped currValue dictId[%d], currIndex[%d]; incoming value 
dictId[%d]",
-            currVal, currIndex, dictId
+      BitmapFactory bitmapFactory = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory();
+      if (hasSpatial) {
+        spatialWriter = new ByteBufferWriter<>(
+            segmentWriteOutMedium,
+            new ImmutableRTreeObjectStrategy(bitmapFactory)
+        );
+        spatialWriter.open();
+        tree = new RTree(
+            2,
+            new LinearGutmanSplitStrategy(0, 50, bitmapFactory),
+            bitmapFactory
         );
-      } else {
-        return NOT_EXIST;
       }
     }
-  }
 
-  public static class ConvertingBitmapValues implements IntIterable
-  {
-    private final BitmapValues baseValues;
-    private final IntBuffer conversionBuffer;
-
-    ConvertingBitmapValues(BitmapValues baseValues, IntBuffer conversionBuffer)
-    {
-      this.baseValues = baseValues;
-      this.conversionBuffer = conversionBuffer;
-    }
-
-    @Nonnull
     @Override
-    public IntIterator iterator()
+    public void mergeIndexes(int dictId, MutableBitmap mergedIndexes) throws 
IOException
     {
-      final IntIterator baseIterator = baseValues.iterator();
-      return new IntIterator()
-      {
-        @Override
-        public boolean hasNext()
-        {
-          return baseIterator.hasNext();
-        }
-
-        @Override
-        public int nextInt()
-        {
-          return conversionBuffer.get(baseIterator.nextInt());
-        }
-
-        @Override
-        public int skip(int n)
-        {
-          return IntIteratorUtils.skip(baseIterator, n);
-        }
-      };
-    }
-  }
-
-  protected IndexSeeker[] toIndexSeekers(
-      List<IndexableAdapter> adapters,
-      ArrayList<IntBuffer> dimConversions,
-      String dimension
-  )
-  {
-    IndexSeeker[] seekers = new IndexSeeker[adapters.size()];
-    for (int i = 0; i < adapters.size(); i++) {
-      IntBuffer dimConversion = dimConversions.get(i);
-      if (dimConversion != null) {
-        seekers[i] = new IndexSeekerWithConversion((IntBuffer) 
dimConversion.asReadOnlyBuffer().rewind());
-      } else {
-        try (CloseableIndexed<String> dimValueLookup = 
adapters.get(i).getDimValueLookup(dimension)) {
-          seekers[i] = new IndexSeekerWithoutConversion(dimValueLookup == null 
? 0 : dimValueLookup.size());
-        }
-        catch (IOException e) {
-          throw new UncheckedIOException(e);
+      if (hasSpatial) {
+        String dimVal = dictionaryWriter.get(dictId);
+        if (dimVal != null) {
+          List<String> stringCoords = 
Lists.newArrayList(SPLITTER.split(dimVal));
+          float[] coords = new float[stringCoords.size()];
+          for (int j = 0; j < coords.length; j++) {
+            coords[j] = Float.valueOf(stringCoords.get(j));
+          }
+          tree.insert(coords, mergedIndexes);
         }
       }
     }
-    return seekers;
-  }
 
-  private boolean allNull(Indexed<String> dimValues)
-  {
-    for (int i = 0, size = dimValues.size(); i < size; i++) {
-      if (dimValues.get(i) != null) {
-        return false;
+    @Override
+    public void write() throws IOException
+    {
+      if (hasSpatial) {
+        spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
       }
     }
-    return true;
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java 
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
index 178dd57..a85102d 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
@@ -199,7 +199,7 @@ public class GenericIndexed<T> implements 
CloseableIndexed<T>, Serializer
     return fromIterable(Arrays.asList(objects), strategy);
   }
 
-  static GenericIndexed<ResourceHolder<ByteBuffer>> ofCompressedByteBuffers(
+  public static GenericIndexed<ResourceHolder<ByteBuffer>> 
ofCompressedByteBuffers(
       Iterable<ByteBuffer> buffers,
       CompressionStrategy compression,
       int bufferSize,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
 
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
index d4254ddd..6db18d0 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
@@ -70,7 +70,7 @@ public class GenericIndexedWriter<T> implements Serializer
       .writeByteArray(x -> x.fileNameByteArray);
 
 
-  static GenericIndexedWriter<ByteBuffer> ofCompressedByteBuffers(
+  public static GenericIndexedWriter<ByteBuffer> ofCompressedByteBuffers(
       final SegmentWriteOutMedium segmentWriteOutMedium,
       final String filenameBase,
       final CompressionStrategy compressionStrategy,
@@ -86,7 +86,7 @@ public class GenericIndexedWriter<T> implements Serializer
     return writer;
   }
 
-  static ObjectStrategy<ByteBuffer> compressedByteBuffersWriteObjectStrategy(
+  public static ObjectStrategy<ByteBuffer> 
compressedByteBuffersWriteObjectStrategy(
       final CompressionStrategy compressionStrategy,
       final int bufferSize,
       final Closer closer
diff --git 
a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
 
b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
index effb4d3..848645d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnPartSerde.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.serde;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.segment.GenericColumnSerializer;
 import org.apache.druid.segment.column.ColumnCapabilities;
 
 import javax.annotation.Nullable;
@@ -95,7 +94,7 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
     @Nullable
     private String typeName = null;
     @Nullable
-    private GenericColumnSerializer delegate = null;
+    private Serializer delegate = null;
 
     public SerializerBuilder withTypeName(final String typeName)
     {
@@ -103,7 +102,7 @@ public class ComplexColumnPartSerde implements 
ColumnPartSerde
       return this;
     }
 
-    public SerializerBuilder withDelegate(final GenericColumnSerializer 
delegate)
+    public SerializerBuilder withDelegate(final Serializer delegate)
     {
       this.delegate = delegate;
       return this;
diff --git 
a/processing/src/test/java/org/apache/druid/segment/DictionaryMergeIteratorTest.java
 
b/processing/src/test/java/org/apache/druid/segment/DictionaryMergingIteratorTest.java
similarity index 90%
rename from 
processing/src/test/java/org/apache/druid/segment/DictionaryMergeIteratorTest.java
rename to 
processing/src/test/java/org/apache/druid/segment/DictionaryMergingIteratorTest.java
index c4bf836..e8a8a9b 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/DictionaryMergeIteratorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/DictionaryMergingIteratorTest.java
@@ -27,9 +27,8 @@ import org.junit.Test;
 
 /**
  */
-public class DictionaryMergeIteratorTest
+public class DictionaryMergingIteratorTest
 {
-
   @Test
   public void basicTest()
   {
@@ -45,7 +44,11 @@ public class DictionaryMergeIteratorTest
     Indexed<String> i4 = new ListIndexed<String>(s4);
     Indexed<String> i5 = new ListIndexed<String>(s5);
 
-    IndexMerger.DictionaryMergeIterator iterator = new 
IndexMerger.DictionaryMergeIterator(new Indexed[]{i1, i2, i3, i4, i5}, false);
+    DictionaryMergingIterator<String> iterator = new 
DictionaryMergingIterator<>(
+        new Indexed[]{i1, i2, i3, i4, i5},
+        StringDimensionMergerV9.DICTIONARY_MERGING_COMPARATOR,
+        false
+    );
 
     Assert.assertArrayEquals(new String[]{"a", "b", "c", "d", "e", "f"}, 
Iterators.toArray(iterator, String.class));
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
index 9f3d59a..0929e79 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
@@ -39,7 +39,6 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -71,8 +70,6 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -2006,31 +2003,6 @@ public class IndexMergerTestBase extends 
InitializedNullHandlingTest
   }
 
   @Test
-  public void testDictIdSeeker()
-  {
-    IntBuffer dimConversions = ByteBuffer.allocateDirect(3 * 
Integer.BYTES).asIntBuffer();
-    dimConversions.put(0);
-    dimConversions.put(2);
-    dimConversions.put(4);
-    IndexMerger.IndexSeeker dictIdSeeker = new 
IndexMerger.IndexSeekerWithConversion(
-        (IntBuffer) dimConversions.asReadOnlyBuffer().rewind()
-    );
-    Assert.assertEquals(0, dictIdSeeker.seek(0));
-    Assert.assertEquals(-1, dictIdSeeker.seek(1));
-    Assert.assertEquals(1, dictIdSeeker.seek(2));
-    try {
-      dictIdSeeker.seek(5);
-      Assert.fail("Only support access in order");
-    }
-    catch (ISE ise) {
-      Assert.assertTrue("Only support access in order", true);
-    }
-    Assert.assertEquals(-1, dictIdSeeker.seek(3));
-    Assert.assertEquals(2, dictIdSeeker.seek(4));
-    Assert.assertEquals(-1, dictIdSeeker.seek(5));
-  }
-
-  @Test
   public void testMultiValueHandling() throws Exception
   {
     InputRow[] rows = new InputRow[]{

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to