This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch 27.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/27.0.0 by this push:
     new 6eb2695831 reduce heap footprint of ingesting auto typed columns by 
pushing compression and index generation into writeTo (#14615) (#14627)
6eb2695831 is described below

commit 6eb26958319774e71048b8334951368ee18b6366
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Jul 20 04:34:06 2023 -0700

    reduce heap footprint of ingesting auto typed columns by pushing 
compression and index generation into writeTo (#14615) (#14627)
---
 .../apache/druid/query/filter/EqualityFilter.java  |   8 +-
 .../druid/segment/data/FixedIndexedWriter.java     |  13 +-
 .../segment/index/AllFalseBitmapColumnIndex.java   |  12 +-
 .../segment/index/IndexedUtf8ValueIndexes.java     |   3 +-
 ...{ValueIndexes.java => ArrayElementIndexes.java} |  16 +-
 .../druid/segment/index/semantic/ValueIndexes.java |   9 +-
 .../nested/ScalarDoubleColumnAndIndexSupplier.java |   4 +-
 .../nested/ScalarDoubleColumnSerializer.java       | 172 +++--------------
 .../nested/ScalarLongColumnAndIndexSupplier.java   |   5 +-
 .../segment/nested/ScalarLongColumnSerializer.java | 172 +++--------------
 ... ScalarNestedCommonFormatColumnSerializer.java} | 215 ++++++++++-----------
 .../nested/ScalarStringColumnSerializer.java       | 151 ++-------------
 .../nested/VariantColumnAndIndexSupplier.java      | 164 ++++++++++++----
 .../segment/nested/VariantColumnSerializer.java    | 174 +++++++++--------
 .../nested/ScalarDoubleColumnSupplierTest.java     |   3 +
 .../nested/ScalarLongColumnSupplierTest.java       |   3 +
 .../nested/ScalarStringColumnSupplierTest.java     |   3 +
 .../segment/nested/VariantColumnSupplierTest.java  |  18 ++
 18 files changed, 458 insertions(+), 687 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java 
b/processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java
index 0e57b06141..ba6d17ccc4 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java
@@ -99,11 +99,11 @@ public class EqualityFilter extends 
AbstractOptimizableDimFilter implements Filt
       throw InvalidInput.exception("Invalid equality filter on column [%s], 
matchValueType cannot be null", column);
     }
     this.matchValueType = matchValueType;
-    if (matchValue == null) {
-      throw InvalidInput.exception("Invalid equality filter on column [%s], 
matchValue cannot be null", column);
-    }
     this.matchValue = matchValue;
     this.matchValueEval = 
ExprEval.ofType(ExpressionType.fromColumnTypeStrict(matchValueType), 
matchValue);
+    if (matchValueEval.value() == null) {
+      throw InvalidInput.exception("Invalid equality filter on column [%s], 
matchValue cannot be null", column);
+    }
     this.filterTuning = filterTuning;
     this.predicateFactory = new EqualityPredicateFactory(matchValueEval);
   }
@@ -239,6 +239,8 @@ public class EqualityFilter extends 
AbstractOptimizableDimFilter implements Filt
 
     final ValueIndexes valueIndexes = indexSupplier.as(ValueIndexes.class);
     if (valueIndexes != null) {
+      // matchValueEval.value() cannot be null here due to check in the 
constructor
+      //noinspection DataFlowIssue
       return valueIndexes.forValue(matchValueEval.value(), matchValueType);
     }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java
 
b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java
index 76944d2dc4..0146856657 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java
@@ -24,7 +24,6 @@ import org.apache.druid.io.Channels;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.segment.column.TypeStrategy;
-import org.apache.druid.segment.serde.Serializer;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 import org.apache.druid.segment.writeout.WriteOutBytes;
 
@@ -39,7 +38,7 @@ import java.util.Iterator;
 /**
  * Writer for a {@link FixedIndexed}
  */
-public class FixedIndexedWriter<T> implements Serializer
+public class FixedIndexedWriter<T> implements DictionaryWriter<T>
 {
   private static final int PAGE_SIZE = 4096;
   private final SegmentWriteOutMedium segmentWriteOutMedium;
@@ -73,11 +72,19 @@ public class FixedIndexedWriter<T> implements Serializer
     this.isSorted = isSorted;
   }
 
+  @Override
+  public boolean isSorted()
+  {
+    return isSorted;
+  }
+
+  @Override
   public void open() throws IOException
   {
     this.valuesOut = segmentWriteOutMedium.makeWriteOutBytes();
   }
 
+  @Override
   public int getCardinality()
   {
     return hasNulls ? numWritten + 1 : numWritten;
@@ -89,6 +96,7 @@ public class FixedIndexedWriter<T> implements Serializer
     return Byte.BYTES + Byte.BYTES + Integer.BYTES + valuesOut.size();
   }
 
+  @Override
   public void write(@Nullable T objectToWrite) throws IOException
   {
     if (prevObject != null && isSorted && comparator.compare(prevObject, 
objectToWrite) >= 0) {
@@ -140,6 +148,7 @@ public class FixedIndexedWriter<T> implements Serializer
   }
 
   @SuppressWarnings("unused")
+  @Override
   @Nullable
   public T get(int index) throws IOException
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/index/AllFalseBitmapColumnIndex.java
 
b/processing/src/main/java/org/apache/druid/segment/index/AllFalseBitmapColumnIndex.java
index fb986c51d9..a7518d44c6 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/index/AllFalseBitmapColumnIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/index/AllFalseBitmapColumnIndex.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.index;
 
+import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.query.BitmapResultFactory;
 import org.apache.druid.query.filter.ColumnIndexSelector;
 import org.apache.druid.segment.column.ColumnIndexCapabilities;
@@ -26,11 +27,16 @@ import 
org.apache.druid.segment.column.SimpleColumnIndexCapabilities;
 
 public class AllFalseBitmapColumnIndex implements BitmapColumnIndex
 {
-  private final ColumnIndexSelector selector;
+  private final BitmapFactory bitmapFactory;
 
   public AllFalseBitmapColumnIndex(ColumnIndexSelector indexSelector)
   {
-    this.selector = indexSelector;
+    this(indexSelector.getBitmapFactory());
+  }
+
+  public AllFalseBitmapColumnIndex(BitmapFactory bitmapFactory)
+  {
+    this.bitmapFactory = bitmapFactory;
   }
 
   @Override
@@ -48,6 +54,6 @@ public class AllFalseBitmapColumnIndex implements 
BitmapColumnIndex
   @Override
   public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory)
   {
-    return 
bitmapResultFactory.wrapAllFalse(selector.getBitmapFactory().makeEmptyImmutableBitmap());
+    return 
bitmapResultFactory.wrapAllFalse(bitmapFactory.makeEmptyImmutableBitmap());
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/index/IndexedUtf8ValueIndexes.java
 
b/processing/src/main/java/org/apache/druid/segment/index/IndexedUtf8ValueIndexes.java
index 2510597470..143103cb62 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/index/IndexedUtf8ValueIndexes.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/index/IndexedUtf8ValueIndexes.java
@@ -37,6 +37,7 @@ import 
org.apache.druid.segment.index.semantic.StringValueSetIndexes;
 import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
 import org.apache.druid.segment.index.semantic.ValueIndexes;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
@@ -101,7 +102,7 @@ public final class IndexedUtf8ValueIndexes<TDictionary 
extends Indexed<ByteBuffe
 
   @Nullable
   @Override
-  public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> 
valueType)
+  public BitmapColumnIndex forValue(@Nonnull Object value, 
TypeSignature<ValueType> valueType)
   {
     if (valueType.isPrimitive()) {
       return forValue(
diff --git 
a/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
 
b/processing/src/main/java/org/apache/druid/segment/index/semantic/ArrayElementIndexes.java
similarity index 68%
copy from 
processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
copy to 
processing/src/main/java/org/apache/druid/segment/index/semantic/ArrayElementIndexes.java
index 28fcf0ae9b..5a8a9d841b 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/index/semantic/ArrayElementIndexes.java
@@ -26,19 +26,19 @@ import org.apache.druid.segment.index.BitmapColumnIndex;
 
 import javax.annotation.Nullable;
 
-public interface ValueIndexes
+public interface ArrayElementIndexes
 {
-
   /**
-   * Get the {@link ImmutableBitmap} corresponding to the supplied value.  
Generates an empty bitmap when passed a
-   * value that doesn't exist. May return null if a value index cannot be 
computed for the supplied value type.
+   * Get the {@link ImmutableBitmap} corresponding to rows with array elements 
matching the supplied value.  Generates
+   * an empty bitmap when passed a value that doesn't exist in any array. May 
return null if a value index cannot be
+   * computed for the supplied value type.
    *
-   * @param value       value to match
+   * @param value       value to match against any array element in a row
    * @param valueType   type of the value to match, used to assist conversion 
from the match value type to the column
    *                    value type
-   * @return            {@link ImmutableBitmap} corresponding to the rows 
which match the value, or null if an index
-   *                    connot be computed for the supplied value type
+   * @return            {@link ImmutableBitmap} corresponding to the rows with 
array elements which match the value, or
+   *                    null if an index connot be computed for the supplied 
value type
    */
   @Nullable
-  BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> valueType);
+  BitmapColumnIndex containsValue(@Nullable Object value, 
TypeSignature<ValueType> valueType);
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
 
b/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
index 28fcf0ae9b..9eee56896d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
@@ -24,14 +24,17 @@ import org.apache.druid.segment.column.TypeSignature;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.index.BitmapColumnIndex;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 public interface ValueIndexes
 {
 
   /**
-   * Get the {@link ImmutableBitmap} corresponding to the supplied value.  
Generates an empty bitmap when passed a
-   * value that doesn't exist. May return null if a value index cannot be 
computed for the supplied value type.
+   * Get the {@link ImmutableBitmap} corresponding to rows matching the 
supplied value.  Generates an empty bitmap when
+   * passed a value that doesn't exist. May return null if a value index 
cannot be computed for the supplied value type.
+   *
+   * Does not match null, use {@link NullValueIndex} for matching nulls.
    *
    * @param value       value to match
    * @param valueType   type of the value to match, used to assist conversion 
from the match value type to the column
@@ -40,5 +43,5 @@ public interface ValueIndexes
    *                    connot be computed for the supplied value type
    */
   @Nullable
-  BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> valueType);
+  BitmapColumnIndex forValue(@Nonnull Object value, TypeSignature<ValueType> 
valueType);
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
index 5624025a20..02c1f6e0a0 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
@@ -65,6 +65,7 @@ import 
org.apache.druid.segment.index.semantic.StringValueSetIndexes;
 import org.apache.druid.segment.index.semantic.ValueIndexes;
 import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -228,11 +229,12 @@ public class ScalarDoubleColumnAndIndexSupplier 
implements Supplier<NestedCommon
   {
     @Nullable
     @Override
-    public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> 
valueType)
+    public BitmapColumnIndex forValue(@Nonnull Object value, 
TypeSignature<ValueType> valueType)
     {
       final ExprEval<?> eval = 
ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value)
                                        .castTo(ExpressionType.DOUBLE);
       if (eval.isNumericNull()) {
+        // value wasn't null, but not a number?
         return null;
       }
       final double doubleValue = eval.asDouble();
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
index b097f542cf..c3a23ac5e6 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
@@ -19,55 +19,28 @@
 
 package org.apache.druid.segment.nested;
 
-import com.google.common.base.Preconditions;
-import org.apache.druid.collections.bitmap.ImmutableBitmap;
-import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.math.expr.ExprEval;
-import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.data.ColumnarDoublesSerializer;
-import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
 import org.apache.druid.segment.data.CompressionFactory;
-import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.data.FixedIndexedWriter;
-import org.apache.druid.segment.data.GenericIndexedWriter;
-import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.channels.WritableByteChannel;
 
 /**
  * Serializer for a {@link ScalarDoubleColumn}
  */
-public class ScalarDoubleColumnSerializer extends 
NestedCommonFormatColumnSerializer
+public class ScalarDoubleColumnSerializer extends 
ScalarNestedCommonFormatColumnSerializer<Double>
 {
-  private static final Logger log = new 
Logger(ScalarDoubleColumnSerializer.class);
-
-  private final String name;
-  private final SegmentWriteOutMedium segmentWriteOutMedium;
-  private final IndexSpec indexSpec;
-  @SuppressWarnings("unused")
-  private final Closer closer;
-  private DictionaryIdLookup dictionaryIdLookup;
-  private FixedIndexedWriter<Double> doubleDictionaryWriter;
-  private int rowCount = 0;
-  private boolean closedForWrite = false;
-  private boolean dictionarySerialized = false;
-
-  private SingleValueColumnarIntsSerializer encodedValueSerializer;
   private ColumnarDoublesSerializer doublesSerializer;
-  private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
-  private MutableBitmap[] bitmaps;
-  private ByteBuffer columnNameBytes = null;
 
   public ScalarDoubleColumnSerializer(
       String name,
@@ -76,54 +49,35 @@ public class ScalarDoubleColumnSerializer extends 
NestedCommonFormatColumnSerial
       Closer closer
   )
   {
-    this.name = name;
-    this.segmentWriteOutMedium = segmentWriteOutMedium;
-    this.indexSpec = indexSpec;
-    this.closer = closer;
-    this.dictionaryIdLookup = new DictionaryIdLookup();
-  }
-
-  @Override
-  public String getColumnName()
-  {
-    return name;
+    super(name, DOUBLE_DICTIONARY_FILE_NAME, indexSpec, segmentWriteOutMedium, 
closer);
   }
 
   @Override
-  public DictionaryIdLookup getGlobalLookup()
+  protected int processValue(@Nullable Object rawValue) throws IOException
   {
-    return dictionaryIdLookup;
+    final ExprEval<?> eval = ExprEval.bestEffortOf(rawValue);
+    final double val = eval.asDouble();
+    final int dictId = eval.isNumericNull() ? 0 : 
dictionaryIdLookup.lookupDouble(val);
+    doublesSerializer.add(dictId == 0 ? 0.0 : val);
+    return dictId;
   }
 
   @Override
-  public boolean hasNulls()
+  public void openDictionaryWriter() throws IOException
   {
-    return !bitmaps[0].isEmpty();
+    dictionaryWriter = new FixedIndexedWriter<>(
+        segmentWriteOutMedium,
+        ColumnType.DOUBLE.getStrategy(),
+        ByteOrder.nativeOrder(),
+        Long.BYTES,
+        true
+    );
+    dictionaryWriter.open();
   }
 
   @Override
-  public void open() throws IOException
+  protected void openValueColumnSerializer() throws IOException
   {
-    if (!dictionarySerialized) {
-      throw new IllegalStateException("Dictionary not serialized, cannot open 
value serializer");
-    }
-    String filenameBase = StringUtils.format("%s.forward_dim", name);
-    final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
-    final CompressionStrategy compressionToUse;
-    if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
-      compressionToUse = compression;
-    } else {
-      compressionToUse = CompressionStrategy.LZ4;
-    }
-    encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
-        name,
-        segmentWriteOutMedium,
-        filenameBase,
-        doubleDictionaryWriter.getCardinality(),
-        compressionToUse
-    );
-    encodedValueSerializer.open();
-
     doublesSerializer = CompressionFactory.getDoubleSerializer(
         name,
         segmentWriteOutMedium,
@@ -132,31 +86,6 @@ public class ScalarDoubleColumnSerializer extends 
NestedCommonFormatColumnSerial
         indexSpec.getDimensionCompression()
     );
     doublesSerializer.open();
-
-    bitmapIndexWriter = new GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name,
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    bitmapIndexWriter.open();
-    bitmapIndexWriter.setObjectsNotSorted();
-    bitmaps = new MutableBitmap[doubleDictionaryWriter.getCardinality()];
-    for (int i = 0; i < bitmaps.length; i++) {
-      bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-    }
-  }
-
-  @Override
-  public void openDictionaryWriter() throws IOException
-  {
-    doubleDictionaryWriter = new FixedIndexedWriter<>(
-        segmentWriteOutMedium,
-        ColumnType.DOUBLE.getStrategy(),
-        ByteOrder.nativeOrder(),
-        Long.BYTES,
-        true
-    );
-    doubleDictionaryWriter.open();
   }
 
   @Override
@@ -168,81 +97,26 @@ public class ScalarDoubleColumnSerializer extends 
NestedCommonFormatColumnSerial
   ) throws IOException
   {
     if (dictionarySerialized) {
-      throw new ISE("String dictionary already serialized for column [%s], 
cannot serialize again", name);
+      throw new ISE("Double dictionary already serialized for column [%s], 
cannot serialize again", name);
     }
 
     // null is always 0
-    doubleDictionaryWriter.write(null);
+    dictionaryWriter.write(null);
     dictionaryIdLookup.addNumericNull();
 
     for (Double value : doubles) {
       if (value == null) {
         continue;
       }
-      doubleDictionaryWriter.write(value);
+      dictionaryWriter.write(value);
       dictionaryIdLookup.addDouble(value);
     }
     dictionarySerialized = true;
   }
 
   @Override
-  public void serialize(ColumnValueSelector<? extends StructuredData> 
selector) throws IOException
+  protected void writeValueColumn(FileSmoosher smoosher) throws IOException
   {
-    if (!dictionarySerialized) {
-      throw new ISE("Must serialize value dictionaries before serializing 
values for column [%s]", name);
-    }
-
-    final Object value = StructuredData.unwrap(selector.getObject());
-    final ExprEval<?> eval = ExprEval.bestEffortOf(value);
-
-    final double val = eval.asDouble();
-    final int dictId = eval.isNumericNull() ? 0 : 
dictionaryIdLookup.lookupDouble(val);
-    encodedValueSerializer.addValue(dictId);
-    doublesSerializer.add(dictId == 0 ? 0.0 : val);
-    bitmaps[dictId].add(rowCount);
-    rowCount++;
-  }
-
-
-  private void closeForWrite() throws IOException
-  {
-    if (!closedForWrite) {
-      for (int i = 0; i < bitmaps.length; i++) {
-        final MutableBitmap bitmap = bitmaps[i];
-        bitmapIndexWriter.write(
-            
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
-        );
-        bitmaps[i] = null; // Reclaim memory
-      }
-      columnNameBytes = computeFilenameBytes();
-      closedForWrite = true;
-    }
-  }
-
-  @Override
-  public long getSerializedSize() throws IOException
-  {
-    closeForWrite();
-
-    long size = 1 + columnNameBytes.capacity();
-    // the value dictionaries, raw column, and null index are all stored in 
separate files
-    return size;
-  }
-
-  @Override
-  public void writeTo(
-      WritableByteChannel channel,
-      FileSmoosher smoosher
-  ) throws IOException
-  {
-    Preconditions.checkState(closedForWrite, "Not closed yet!");
-
-    writeV0Header(channel, columnNameBytes);
-    writeInternal(smoosher, doubleDictionaryWriter, 
DOUBLE_DICTIONARY_FILE_NAME);
-    writeInternal(smoosher, encodedValueSerializer, 
ENCODED_VALUE_COLUMN_FILE_NAME);
     writeInternal(smoosher, doublesSerializer, DOUBLE_VALUE_COLUMN_FILE_NAME);
-    writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
-
-    log.info("Column [%s] serialized successfully.", name);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
index a5170ca9a5..9682c781e5 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
@@ -64,6 +64,7 @@ import 
org.apache.druid.segment.index.semantic.StringValueSetIndexes;
 import org.apache.druid.segment.index.semantic.ValueIndexes;
 import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -228,11 +229,13 @@ public class ScalarLongColumnAndIndexSupplier implements 
Supplier<NestedCommonFo
   {
     @Nullable
     @Override
-    public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> 
valueType)
+    public BitmapColumnIndex forValue(@Nonnull Object value, 
TypeSignature<ValueType> valueType)
     {
+
       final ExprEval<?> eval = 
ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value)
                                        .castTo(ExpressionType.LONG);
       if (eval.isNumericNull()) {
+        // value wasn't null, but not a number
         return null;
       }
       final long longValue = eval.asLong();
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
index b3ade2f0e2..4d5604851d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
@@ -19,55 +19,28 @@
 
 package org.apache.druid.segment.nested;
 
-import com.google.common.base.Preconditions;
-import org.apache.druid.collections.bitmap.ImmutableBitmap;
-import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.math.expr.ExprEval;
-import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.data.ColumnarLongsSerializer;
-import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
 import org.apache.druid.segment.data.CompressionFactory;
-import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.data.FixedIndexedWriter;
-import org.apache.druid.segment.data.GenericIndexedWriter;
-import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.channels.WritableByteChannel;
 
 /**
  * Serializer for a {@link ScalarLongColumn}
  */
-public class ScalarLongColumnSerializer extends 
NestedCommonFormatColumnSerializer
+public class ScalarLongColumnSerializer extends 
ScalarNestedCommonFormatColumnSerializer<Long>
 {
-  private static final Logger log = new 
Logger(ScalarLongColumnSerializer.class);
-
-  private final String name;
-  private final SegmentWriteOutMedium segmentWriteOutMedium;
-  private final IndexSpec indexSpec;
-  @SuppressWarnings("unused")
-  private final Closer closer;
-  private DictionaryIdLookup dictionaryIdLookup;
-  private FixedIndexedWriter<Long> longDictionaryWriter;
-  private int rowCount = 0;
-  private boolean closedForWrite = false;
-  private boolean dictionarySerialized = false;
-
-  private SingleValueColumnarIntsSerializer encodedValueSerializer;
   private ColumnarLongsSerializer longsSerializer;
-  private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
-  private MutableBitmap[] bitmaps;
-  private ByteBuffer columnNameBytes = null;
 
   public ScalarLongColumnSerializer(
       String name,
@@ -76,54 +49,36 @@ public class ScalarLongColumnSerializer extends 
NestedCommonFormatColumnSerializ
       Closer closer
   )
   {
-    this.name = name;
-    this.segmentWriteOutMedium = segmentWriteOutMedium;
-    this.indexSpec = indexSpec;
-    this.closer = closer;
-    this.dictionaryIdLookup = new DictionaryIdLookup();
+    super(name, LONG_DICTIONARY_FILE_NAME, indexSpec, segmentWriteOutMedium, 
closer);
   }
 
   @Override
-  public String getColumnName()
+  protected int processValue(@Nullable Object rawValue) throws IOException
   {
-    return name;
-  }
+    final ExprEval<?> eval = ExprEval.bestEffortOf(rawValue);
 
-  @Override
-  public DictionaryIdLookup getGlobalLookup()
-  {
-    return dictionaryIdLookup;
+    final long val = eval.asLong();
+    final int dictId = eval.isNumericNull() ? 0 : 
dictionaryIdLookup.lookupLong(val);
+    longsSerializer.add(dictId == 0 ? 0L : val);
+    return dictId;
   }
 
   @Override
-  public boolean hasNulls()
+  public void openDictionaryWriter() throws IOException
   {
-    return !bitmaps[0].isEmpty();
+    dictionaryWriter = new FixedIndexedWriter<>(
+        segmentWriteOutMedium,
+        ColumnType.LONG.getStrategy(),
+        ByteOrder.nativeOrder(),
+        Long.BYTES,
+        true
+    );
+    dictionaryWriter.open();
   }
 
   @Override
-  public void open() throws IOException
+  protected void openValueColumnSerializer() throws IOException
   {
-    if (!dictionarySerialized) {
-      throw new IllegalStateException("Dictionary not serialized, cannot open 
value serializer");
-    }
-    String filenameBase = StringUtils.format("%s.forward_dim", name);
-    final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
-    final CompressionStrategy compressionToUse;
-    if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
-      compressionToUse = compression;
-    } else {
-      compressionToUse = CompressionStrategy.LZ4;
-    }
-    encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
-        name,
-        segmentWriteOutMedium,
-        filenameBase,
-        longDictionaryWriter.getCardinality(),
-        compressionToUse
-    );
-    encodedValueSerializer.open();
-
     longsSerializer = CompressionFactory.getLongSerializer(
         name,
         segmentWriteOutMedium,
@@ -133,34 +88,8 @@ public class ScalarLongColumnSerializer extends 
NestedCommonFormatColumnSerializ
         indexSpec.getDimensionCompression()
     );
     longsSerializer.open();
-
-    bitmapIndexWriter = new GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name,
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    bitmapIndexWriter.open();
-    bitmapIndexWriter.setObjectsNotSorted();
-    bitmaps = new MutableBitmap[longDictionaryWriter.getCardinality()];
-    for (int i = 0; i < bitmaps.length; i++) {
-      bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-    }
-  }
-
-  @Override
-  public void openDictionaryWriter() throws IOException
-  {
-    longDictionaryWriter = new FixedIndexedWriter<>(
-        segmentWriteOutMedium,
-        ColumnType.LONG.getStrategy(),
-        ByteOrder.nativeOrder(),
-        Long.BYTES,
-        true
-    );
-    longDictionaryWriter.open();
   }
 
-
   @Override
   public void serializeDictionaries(
       Iterable<String> strings,
@@ -170,81 +99,26 @@ public class ScalarLongColumnSerializer extends 
NestedCommonFormatColumnSerializ
   ) throws IOException
   {
     if (dictionarySerialized) {
-      throw new ISE("String dictionary already serialized for column [%s], 
cannot serialize again", name);
+      throw new ISE("Long dictionary already serialized for column [%s], 
cannot serialize again", name);
     }
 
     // null is always 0
-    longDictionaryWriter.write(null);
+    dictionaryWriter.write(null);
     dictionaryIdLookup.addNumericNull();
 
     for (Long value : longs) {
       if (value == null) {
         continue;
       }
-      longDictionaryWriter.write(value);
+      dictionaryWriter.write(value);
       dictionaryIdLookup.addLong(value);
     }
     dictionarySerialized = true;
   }
 
   @Override
-  public void serialize(ColumnValueSelector<? extends StructuredData> 
selector) throws IOException
+  protected void writeValueColumn(FileSmoosher smoosher) throws IOException
   {
-    if (!dictionarySerialized) {
-      throw new ISE("Must serialize value dictionaries before serializing 
values for column [%s]", name);
-    }
-
-    final Object value = StructuredData.unwrap(selector.getObject());
-    final ExprEval<?> eval = ExprEval.bestEffortOf(value);
-
-    final long val = eval.asLong();
-    final int dictId = eval.isNumericNull() ? 0 : 
dictionaryIdLookup.lookupLong(val);
-    encodedValueSerializer.addValue(dictId);
-    longsSerializer.add(dictId == 0 ? 0L : val);
-    bitmaps[dictId].add(rowCount);
-    rowCount++;
-  }
-
-
-  private void closeForWrite() throws IOException
-  {
-    if (!closedForWrite) {
-      for (int i = 0; i < bitmaps.length; i++) {
-        final MutableBitmap bitmap = bitmaps[i];
-        bitmapIndexWriter.write(
-            
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
-        );
-        bitmaps[i] = null; // Reclaim memory
-      }
-      columnNameBytes = computeFilenameBytes();
-      closedForWrite = true;
-    }
-  }
-
-  @Override
-  public long getSerializedSize() throws IOException
-  {
-    closeForWrite();
-
-    long size = 1 + columnNameBytes.capacity();
-    // the value dictionaries, raw column, and null index are all stored in 
separate files
-    return size;
-  }
-
-  @Override
-  public void writeTo(
-      WritableByteChannel channel,
-      FileSmoosher smoosher
-  ) throws IOException
-  {
-    Preconditions.checkState(closedForWrite, "Not closed yet!");
-
-    writeV0Header(channel, columnNameBytes);
-    writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME);
-    writeInternal(smoosher, encodedValueSerializer, 
ENCODED_VALUE_COLUMN_FILE_NAME);
     writeInternal(smoosher, longsSerializer, LONG_VALUE_COLUMN_FILE_NAME);
-    writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
-
-    log.info("Column [%s] serialized successfully.", name);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
similarity index 58%
copy from 
processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
copy to 
processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
index 909d0d50f4..f3ed96942e 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
@@ -20,69 +20,85 @@
 package org.apache.druid.segment.nested;
 
 import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
-import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.math.expr.ExprEval;
-import org.apache.druid.math.expr.ExpressionType;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.column.StringEncodingStrategies;
-import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
 import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
 import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.data.DictionaryWriter;
+import org.apache.druid.segment.data.FixedIndexedIntWriter;
 import org.apache.druid.segment.data.GenericIndexedWriter;
 import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
-/**
- * Serializer for a string {@link NestedCommonFormatColumn} that can be read 
with
- * {@link StringUtf8DictionaryEncodedColumn}.
- */
-public class ScalarStringColumnSerializer extends 
NestedCommonFormatColumnSerializer
+public abstract class ScalarNestedCommonFormatColumnSerializer<T> extends 
NestedCommonFormatColumnSerializer
 {
-  private static final Logger log = new 
Logger(ScalarStringColumnSerializer.class);
+  protected static final Logger log = new 
Logger(ScalarNestedCommonFormatColumnSerializer.class);
 
-  private final String name;
-  private final SegmentWriteOutMedium segmentWriteOutMedium;
-  private final IndexSpec indexSpec;
+  protected final String name;
+  protected final SegmentWriteOutMedium segmentWriteOutMedium;
+  protected final IndexSpec indexSpec;
   @SuppressWarnings("unused")
-  private final Closer closer;
-  private DictionaryIdLookup dictionaryIdLookup;
-  private DictionaryWriter<String> dictionaryWriter;
-  private int rowCount = 0;
-  private boolean closedForWrite = false;
-  private boolean dictionarySerialized = false;
-
-  private SingleValueColumnarIntsSerializer encodedValueSerializer;
-  private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
-  private MutableBitmap[] bitmaps;
-  private ByteBuffer columnNameBytes = null;
-
-  public ScalarStringColumnSerializer(
+  protected final Closer closer;
+  protected final String dictionaryFileName;
+
+  protected DictionaryIdLookup dictionaryIdLookup;
+  protected DictionaryWriter<T> dictionaryWriter;
+  protected boolean closedForWrite = false;
+  protected boolean dictionarySerialized = false;
+  protected FixedIndexedIntWriter intermediateValueWriter;
+  protected ByteBuffer columnNameBytes = null;
+
+  protected boolean hasNulls;
+
+
+  public ScalarNestedCommonFormatColumnSerializer(
       String name,
+      String dictionaryFileName,
       IndexSpec indexSpec,
       SegmentWriteOutMedium segmentWriteOutMedium,
       Closer closer
   )
   {
     this.name = name;
+    this.dictionaryFileName = dictionaryFileName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.indexSpec = indexSpec;
     this.closer = closer;
     this.dictionaryIdLookup = new DictionaryIdLookup();
   }
 
+  /**
+   * Called during {@link #serialize(ColumnValueSelector)} to convert value to 
dictionary id.
+   * <p>
+   * Implementations may optionally also serialize the value to a type 
specific value column if they opened one with
+   * {@link #openValueColumnSerializer()}, or do whatever else is useful to do 
while handling a single row value.
+   */
+  protected abstract int processValue(@Nullable Object rawValue) throws 
IOException;
+
+  /**
+   * Called during {@link #open()} to allow opening any separate type specific 
value column serializers
+   */
+  protected abstract void openValueColumnSerializer() throws IOException;
+
+  /**
+   * Called during {@link #writeTo(WritableByteChannel, FileSmoosher)} to 
allow any type specific value column
+   * serializers to use the {@link FileSmoosher} to write stuff to places.
+   */
+  protected abstract void writeValueColumn(FileSmoosher smoosher) throws 
IOException;
+
   @Override
   public String getColumnName()
   {
@@ -98,18 +114,7 @@ public class ScalarStringColumnSerializer extends 
NestedCommonFormatColumnSerial
   @Override
   public boolean hasNulls()
   {
-    return !bitmaps[0].isEmpty();
-  }
-
-  @Override
-  public void openDictionaryWriter() throws IOException
-  {
-    dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter(
-        indexSpec.getStringDictionaryEncoding(),
-        segmentWriteOutMedium,
-        name
-    );
-    dictionaryWriter.open();
+    return hasNulls;
   }
 
   @Override
@@ -118,62 +123,9 @@ public class ScalarStringColumnSerializer extends 
NestedCommonFormatColumnSerial
     if (!dictionarySerialized) {
       throw new IllegalStateException("Dictionary not serialized, cannot open 
value serializer");
     }
-    String filenameBase = StringUtils.format("%s.forward_dim", name);
-    final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
-    final CompressionStrategy compressionToUse;
-    if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
-      compressionToUse = compression;
-    } else {
-      // always compress
-      compressionToUse = CompressionStrategy.LZ4;
-    }
-    encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
-        name,
-        segmentWriteOutMedium,
-        filenameBase,
-        dictionaryWriter.getCardinality(),
-        compressionToUse
-    );
-    encodedValueSerializer.open();
-
-    bitmapIndexWriter = new GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name,
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    bitmapIndexWriter.open();
-    bitmapIndexWriter.setObjectsNotSorted();
-    bitmaps = new MutableBitmap[dictionaryWriter.getCardinality()];
-    for (int i = 0; i < bitmaps.length; i++) {
-      bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-    }
-  }
-
-  @Override
-  public void serializeDictionaries(
-      Iterable<String> strings,
-      Iterable<Long> longs,
-      Iterable<Double> doubles,
-      Iterable<int[]> arrays
-  ) throws IOException
-  {
-    if (dictionarySerialized) {
-      throw new ISE("String dictionary already serialized for column [%s], 
cannot serialize again", name);
-    }
-
-    // null is always 0
-    dictionaryWriter.write(null);
-    dictionaryIdLookup.addString(null);
-    for (String value : strings) {
-      value = NullHandling.emptyToNullIfNeeded(value);
-      if (value == null) {
-        continue;
-      }
-
-      dictionaryWriter.write(value);
-      dictionaryIdLookup.addString(value);
-    }
-    dictionarySerialized = true;
+    intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, 
false);
+    intermediateValueWriter.open();
+    openValueColumnSerializer();
   }
 
   @Override
@@ -184,31 +136,21 @@ public class ScalarStringColumnSerializer extends 
NestedCommonFormatColumnSerial
     }
 
     final Object value = StructuredData.unwrap(selector.getObject());
-    final ExprEval<?> eval = ExprEval.bestEffortOf(value);
-    final String s = eval.castTo(ExpressionType.STRING).asString();
-    final int dictId = dictionaryIdLookup.lookupString(s);
-    encodedValueSerializer.addValue(dictId);
-    bitmaps[dictId].add(rowCount);
-    rowCount++;
+    final int dictId = processValue(value);
+    intermediateValueWriter.write(dictId);
+    hasNulls = hasNulls || dictId == 0;
   }
 
-  private void closeForWrite() throws IOException
+  private void closeForWrite()
   {
     if (!closedForWrite) {
-      for (int i = 0; i < bitmaps.length; i++) {
-        final MutableBitmap bitmap = bitmaps[i];
-        bitmapIndexWriter.write(
-            
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
-        );
-        bitmaps[i] = null; // Reclaim memory
-      }
       columnNameBytes = computeFilenameBytes();
       closedForWrite = true;
     }
   }
 
   @Override
-  public long getSerializedSize() throws IOException
+  public long getSerializedSize()
   {
     closeForWrite();
 
@@ -227,9 +169,60 @@ public class ScalarStringColumnSerializer extends 
NestedCommonFormatColumnSerial
     Preconditions.checkState(closedForWrite, "Not closed yet!");
     Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not 
sorted?!?");
 
+    // write out compressed dictionaryId int column and bitmap indexes by 
iterating intermediate value column
+    // the intermediate value column should be replaced someday by a cooler 
compressed int column writer that allows
+    // easy iteration of the values it writes out, so that we could just build 
the bitmap indexes here instead of
+    // doing both things
+    String filenameBase = StringUtils.format("%s.forward_dim", name);
+    final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
+    final CompressionStrategy compressionToUse;
+    if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
+      compressionToUse = compression;
+    } else {
+      compressionToUse = CompressionStrategy.LZ4;
+    }
+    final SingleValueColumnarIntsSerializer encodedValueSerializer = 
CompressedVSizeColumnarIntsSerializer.create(
+        name,
+        segmentWriteOutMedium,
+        filenameBase,
+        dictionaryWriter.getCardinality(),
+        compressionToUse
+    );
+    encodedValueSerializer.open();
+
+    final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new 
GenericIndexedWriter<>(
+        segmentWriteOutMedium,
+        name,
+        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+    );
+    bitmapIndexWriter.open();
+    bitmapIndexWriter.setObjectsNotSorted();
+    final MutableBitmap[] bitmaps;
+    bitmaps = new MutableBitmap[dictionaryWriter.getCardinality()];
+    for (int i = 0; i < bitmaps.length; i++) {
+      bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+    }
+
+    final IntIterator rows = intermediateValueWriter.getIterator();
+    int rowCount = 0;
+    while (rows.hasNext()) {
+      final int dictId = rows.nextInt();
+      encodedValueSerializer.addValue(dictId);
+      bitmaps[dictId].add(rowCount++);
+    }
+
+    for (int i = 0; i < bitmaps.length; i++) {
+      final MutableBitmap bitmap = bitmaps[i];
+      bitmapIndexWriter.write(
+          
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+      );
+      bitmaps[i] = null; // Reclaim memory
+    }
+
     writeV0Header(channel, columnNameBytes);
-    writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME);
+    writeInternal(smoosher, dictionaryWriter, dictionaryFileName);
     writeInternal(smoosher, encodedValueSerializer, 
ENCODED_VALUE_COLUMN_FILE_NAME);
+    writeValueColumn(smoosher);
     writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
 
     log.info("Column [%s] serialized successfully.", name);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
index 909d0d50f4..f28db6e1a0 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
@@ -19,56 +19,26 @@
 
 package org.apache.druid.segment.nested;
 
-import com.google.common.base.Preconditions;
-import org.apache.druid.collections.bitmap.ImmutableBitmap;
-import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.math.expr.ExprEval;
 import org.apache.druid.math.expr.ExpressionType;
-import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.column.StringEncodingStrategies;
 import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
-import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
-import org.apache.druid.segment.data.CompressionStrategy;
-import org.apache.druid.segment.data.DictionaryWriter;
-import org.apache.druid.segment.data.GenericIndexedWriter;
-import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
 
 /**
  * Serializer for a string {@link NestedCommonFormatColumn} that can be read 
with
  * {@link StringUtf8DictionaryEncodedColumn}.
  */
-public class ScalarStringColumnSerializer extends 
NestedCommonFormatColumnSerializer
+public class ScalarStringColumnSerializer extends 
ScalarNestedCommonFormatColumnSerializer<String>
 {
-  private static final Logger log = new 
Logger(ScalarStringColumnSerializer.class);
-
-  private final String name;
-  private final SegmentWriteOutMedium segmentWriteOutMedium;
-  private final IndexSpec indexSpec;
-  @SuppressWarnings("unused")
-  private final Closer closer;
-  private DictionaryIdLookup dictionaryIdLookup;
-  private DictionaryWriter<String> dictionaryWriter;
-  private int rowCount = 0;
-  private boolean closedForWrite = false;
-  private boolean dictionarySerialized = false;
-
-  private SingleValueColumnarIntsSerializer encodedValueSerializer;
-  private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
-  private MutableBitmap[] bitmaps;
-  private ByteBuffer columnNameBytes = null;
-
   public ScalarStringColumnSerializer(
       String name,
       IndexSpec indexSpec,
@@ -76,29 +46,16 @@ public class ScalarStringColumnSerializer extends 
NestedCommonFormatColumnSerial
       Closer closer
   )
   {
-    this.name = name;
-    this.segmentWriteOutMedium = segmentWriteOutMedium;
-    this.indexSpec = indexSpec;
-    this.closer = closer;
-    this.dictionaryIdLookup = new DictionaryIdLookup();
-  }
-
-  @Override
-  public String getColumnName()
-  {
-    return name;
-  }
-
-  @Override
-  public DictionaryIdLookup getGlobalLookup()
-  {
-    return dictionaryIdLookup;
+    super(name, STRING_DICTIONARY_FILE_NAME, indexSpec, segmentWriteOutMedium, 
closer);
   }
 
   @Override
-  public boolean hasNulls()
+  protected int processValue(@Nullable Object rawValue)
   {
-    return !bitmaps[0].isEmpty();
+    final ExprEval<?> eval = ExprEval.bestEffortOf(rawValue);
+    final String s = eval.castTo(ExpressionType.STRING).asString();
+    final int dictId = dictionaryIdLookup.lookupString(s);
+    return dictId;
   }
 
   @Override
@@ -113,40 +70,9 @@ public class ScalarStringColumnSerializer extends 
NestedCommonFormatColumnSerial
   }
 
   @Override
-  public void open() throws IOException
+  protected void openValueColumnSerializer()
   {
-    if (!dictionarySerialized) {
-      throw new IllegalStateException("Dictionary not serialized, cannot open 
value serializer");
-    }
-    String filenameBase = StringUtils.format("%s.forward_dim", name);
-    final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
-    final CompressionStrategy compressionToUse;
-    if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
-      compressionToUse = compression;
-    } else {
-      // always compress
-      compressionToUse = CompressionStrategy.LZ4;
-    }
-    encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
-        name,
-        segmentWriteOutMedium,
-        filenameBase,
-        dictionaryWriter.getCardinality(),
-        compressionToUse
-    );
-    encodedValueSerializer.open();
-
-    bitmapIndexWriter = new GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name,
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    bitmapIndexWriter.open();
-    bitmapIndexWriter.setObjectsNotSorted();
-    bitmaps = new MutableBitmap[dictionaryWriter.getCardinality()];
-    for (int i = 0; i < bitmaps.length; i++) {
-      bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-    }
+    // no extra value column for strings
   }
 
   @Override
@@ -177,61 +103,8 @@ public class ScalarStringColumnSerializer extends 
NestedCommonFormatColumnSerial
   }
 
   @Override
-  public void serialize(ColumnValueSelector<? extends StructuredData> 
selector) throws IOException
+  protected void writeValueColumn(FileSmoosher smoosher)
   {
-    if (!dictionarySerialized) {
-      throw new ISE("Must serialize value dictionaries before serializing 
values for column [%s]", name);
-    }
-
-    final Object value = StructuredData.unwrap(selector.getObject());
-    final ExprEval<?> eval = ExprEval.bestEffortOf(value);
-    final String s = eval.castTo(ExpressionType.STRING).asString();
-    final int dictId = dictionaryIdLookup.lookupString(s);
-    encodedValueSerializer.addValue(dictId);
-    bitmaps[dictId].add(rowCount);
-    rowCount++;
-  }
-
-  private void closeForWrite() throws IOException
-  {
-    if (!closedForWrite) {
-      for (int i = 0; i < bitmaps.length; i++) {
-        final MutableBitmap bitmap = bitmaps[i];
-        bitmapIndexWriter.write(
-            
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
-        );
-        bitmaps[i] = null; // Reclaim memory
-      }
-      columnNameBytes = computeFilenameBytes();
-      closedForWrite = true;
-    }
-  }
-
-  @Override
-  public long getSerializedSize() throws IOException
-  {
-    closeForWrite();
-
-    // standard string version
-    long size = 1 + columnNameBytes.capacity();
-    // the value dictionaries, raw column, and null index are all stored in 
separate files
-    return size;
-  }
-
-  @Override
-  public void writeTo(
-      WritableByteChannel channel,
-      FileSmoosher smoosher
-  ) throws IOException
-  {
-    Preconditions.checkState(closedForWrite, "Not closed yet!");
-    Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not 
sorted?!?");
-
-    writeV0Header(channel, columnNameBytes);
-    writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME);
-    writeInternal(smoosher, encodedValueSerializer, 
ENCODED_VALUE_COLUMN_FILE_NAME);
-    writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
-
-    log.info("Column [%s] serialized successfully.", name);
+    // no extra value column for strings
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
index 5fabfefef6..f67123123d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExprType;
 import org.apache.druid.math.expr.ExpressionType;
 import org.apache.druid.query.BitmapResultFactory;
 import org.apache.druid.segment.column.ColumnBuilder;
@@ -47,13 +48,16 @@ import 
org.apache.druid.segment.data.FrontCodedIntArrayIndexed;
 import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.VByte;
+import org.apache.druid.segment.index.AllFalseBitmapColumnIndex;
 import org.apache.druid.segment.index.BitmapColumnIndex;
 import org.apache.druid.segment.index.SimpleBitmapColumnIndex;
 import org.apache.druid.segment.index.SimpleImmutableBitmapIndex;
+import org.apache.druid.segment.index.semantic.ArrayElementIndexes;
 import org.apache.druid.segment.index.semantic.NullValueIndex;
 import org.apache.druid.segment.index.semantic.ValueIndexes;
 import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -93,6 +97,7 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
         final Supplier<FixedIndexed<Long>> longDictionarySupplier;
         final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
         final Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier;
+        final Supplier<FixedIndexed<Integer>> arrayElementDictionarySupplier;
 
         final ByteBuffer stringDictionaryBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
             mapper,
@@ -147,6 +152,11 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
             columnName,
             NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME
         );
+        final ByteBuffer arrayElementDictionaryBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
+            mapper,
+            columnName,
+            
NestedCommonFormatColumnSerializer.ARRAY_ELEMENT_DICTIONARY_FILE_NAME
+        );
         final ByteBuffer valueIndexBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
             mapper,
             columnName,
@@ -180,6 +190,7 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
             byteOrder,
             Double.BYTES
         );
+
         final ByteBuffer arrayDictionarybuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
             mapper,
             columnName,
@@ -193,6 +204,12 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
         try (ColumnarInts throwAway = ints.get()) {
           size = throwAway.size();
         }
+        arrayElementDictionarySupplier = FixedIndexed.read(
+            arrayElementDictionaryBuffer,
+            CompressedNestedDataComplexColumn.INT_TYPE_STRATEGY,
+            byteOrder,
+            Integer.BYTES
+        );
         return new VariantColumnAndIndexSupplier(
             logicalType,
             variantTypeByte,
@@ -201,6 +218,7 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
             longDictionarySupplier,
             doubleDictionarySupplier,
             arrayDictionarySupplier,
+            arrayElementDictionarySupplier,
             ints,
             valueIndexes,
             arrayElementIndexes,
@@ -222,11 +240,11 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
   @Nullable
   private final Byte variantTypeSetByte;
   private final BitmapFactory bitmapFactory;
-  private final GenericIndexed<ByteBuffer> stringDictionary;
-  private final Supplier<FrontCodedIndexed> frontCodedStringDictionarySupplier;
+  private final Supplier<? extends Indexed<ByteBuffer>> 
stringDictionarySupplier;
   private final Supplier<FixedIndexed<Long>> longDictionarySupplier;
   private final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
   private final Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier;
+  private final Supplier<FixedIndexed<Integer>> arrayElementDictionarySupplier;
   private final Supplier<ColumnarInts> encodedValueColumnSupplier;
   @SuppressWarnings("unused")
   private final GenericIndexed<ImmutableBitmap> valueIndexes;
@@ -242,6 +260,7 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
       Supplier<FixedIndexed<Long>> longDictionarySupplier,
       Supplier<FixedIndexed<Double>> doubleDictionarySupplier,
       Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier,
+      Supplier<FixedIndexed<Integer>> arrayElementDictionarySupplier,
       Supplier<ColumnarInts> encodedValueColumnSupplier,
       GenericIndexed<ImmutableBitmap> valueIndexes,
       GenericIndexed<ImmutableBitmap> elementIndexes,
@@ -252,11 +271,13 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
   {
     this.logicalType = logicalType;
     this.variantTypeSetByte = variantTypeSetByte;
-    this.stringDictionary = stringDictionary;
-    this.frontCodedStringDictionarySupplier = 
frontCodedStringDictionarySupplier;
+    stringDictionarySupplier = frontCodedStringDictionarySupplier != null
+                               ? frontCodedStringDictionarySupplier
+                               : stringDictionary::singleThreaded;
     this.longDictionarySupplier = longDictionarySupplier;
     this.doubleDictionarySupplier = doubleDictionarySupplier;
     this.arrayDictionarySupplier = arrayDictionarySupplier;
+    this.arrayElementDictionarySupplier = arrayElementDictionarySupplier;
     this.encodedValueColumnSupplier = encodedValueColumnSupplier;
     this.valueIndexes = valueIndexes;
     this.arrayElementIndexes = elementIndexes;
@@ -273,20 +294,8 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
   @Override
   public NestedCommonFormatColumn get()
   {
-    if (frontCodedStringDictionarySupplier != null) {
-      return new VariantColumn<>(
-          frontCodedStringDictionarySupplier.get(),
-          longDictionarySupplier.get(),
-          doubleDictionarySupplier.get(),
-          arrayDictionarySupplier.get(),
-          encodedValueColumnSupplier.get(),
-          nullValueBitmap,
-          logicalType,
-          variantTypeSetByte
-      );
-    }
     return new VariantColumn<>(
-        stringDictionary.singleThreaded(),
+        stringDictionarySupplier.get(),
         longDictionarySupplier.get(),
         doubleDictionarySupplier.get(),
         arrayDictionarySupplier.get(),
@@ -306,6 +315,8 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
       return (T) (NullValueIndex) () -> nullIndex;
     } else if (clazz.equals(ValueIndexes.class) && variantTypeSetByte == null 
&& logicalType.isArray()) {
       return (T) new ArrayValueIndexes();
+    } else if (clazz.equals(ArrayElementIndexes.class) && variantTypeSetByte 
== null && logicalType.isArray()) {
+      return (T) new VariantArrayElementIndexes();
     }
     return null;
   }
@@ -320,30 +331,42 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
     return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
   }
 
+  private ImmutableBitmap getElementBitmap(int idx)
+  {
+    if (idx < 0) {
+      return bitmapFactory.makeEmptyImmutableBitmap();
+    }
+    final int elementDictionaryIndex = 
arrayElementDictionarySupplier.get().indexOf(idx);
+    if (elementDictionaryIndex < 0) {
+      return bitmapFactory.makeEmptyImmutableBitmap();
+    }
+    final ImmutableBitmap bitmap = 
arrayElementIndexes.get(elementDictionaryIndex);
+    return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
+  }
+
   private class ArrayValueIndexes implements ValueIndexes
   {
     @Nullable
     @Override
-    public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> 
valueType)
+    public BitmapColumnIndex forValue(@Nonnull Object value, 
TypeSignature<ValueType> valueType)
     {
       final ExprEval<?> eval = 
ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value)
                                        
.castTo(ExpressionType.fromColumnTypeStrict(logicalType));
-      if (eval.value() == null) {
-        return null;
-      }
       final Object[] arrayToMatch = eval.asArray();
       Indexed elements;
+      final int elementOffset;
       switch (logicalType.getElementType().getType()) {
         case STRING:
-          elements = frontCodedStringDictionarySupplier != null
-                     ? frontCodedStringDictionarySupplier.get()
-                     : stringDictionary.singleThreaded();
+          elements = stringDictionarySupplier.get();
+          elementOffset = 0;
           break;
         case LONG:
           elements = longDictionarySupplier.get();
+          elementOffset = stringDictionarySupplier.get().size();
           break;
         case DOUBLE:
           elements = doubleDictionarySupplier.get();
+          elementOffset = stringDictionarySupplier.get().size() + 
longDictionarySupplier.get().size();
           break;
         default:
           throw DruidException.defensive(
@@ -353,30 +376,29 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
       }
 
       final int[] ids = new int[arrayToMatch.length];
-      boolean hasMissingElement = false;
+      final int arrayOffset = stringDictionarySupplier.get().size() + 
longDictionarySupplier.get().size() + doubleDictionarySupplier.get().size();
       for (int i = 0; i < arrayToMatch.length; i++) {
-        if (logicalType.getElementType().is(ValueType.STRING)) {
+        if (arrayToMatch[i] == null) {
+          ids[i] = 0;
+        } else if (logicalType.getElementType().is(ValueType.STRING)) {
           ids[i] = elements.indexOf(StringUtils.toUtf8ByteBuffer((String) 
arrayToMatch[i]));
         } else {
-          ids[i] = elements.indexOf(arrayToMatch[i]);
+          ids[i] = elements.indexOf(arrayToMatch[i]) + elementOffset;
         }
         if (ids[i] < 0) {
-          hasMissingElement = true;
-          break;
+          if (value == null) {
+            return new AllFalseBitmapColumnIndex(bitmapFactory);
+          }
         }
       }
 
-      final boolean noMatch = hasMissingElement;
       final FrontCodedIntArrayIndexed dictionary = 
arrayDictionarySupplier.get();
       return new SimpleBitmapColumnIndex()
       {
         @Override
         public double estimateSelectivity(int totalRows)
         {
-          if (noMatch) {
-            return 0.0;
-          }
-          final int id = dictionary.indexOf(ids);
+          final int id = dictionary.indexOf(ids) + arrayOffset;
           if (id < 0) {
             return 0.0;
           }
@@ -386,10 +408,7 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
         @Override
         public <T> T computeBitmapResult(BitmapResultFactory<T> 
bitmapResultFactory)
         {
-          if (noMatch) {
-            return 
bitmapResultFactory.wrapDimensionValue(bitmapFactory.makeEmptyImmutableBitmap());
-          }
-          final int id = dictionary.indexOf(ids);
+          final int id = dictionary.indexOf(ids) + arrayOffset;
           if (id < 0) {
             return 
bitmapResultFactory.wrapDimensionValue(bitmapFactory.makeEmptyImmutableBitmap());
           }
@@ -398,4 +417,73 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
       };
     }
   }
+
+  private class VariantArrayElementIndexes implements ArrayElementIndexes
+  {
+
+    @Nullable
+    @Override
+    public BitmapColumnIndex containsValue(@Nullable Object value, 
TypeSignature<ValueType> elementValueType)
+    {
+      final ExprEval<?> eval = 
ExprEval.ofType(ExpressionType.fromColumnTypeStrict(elementValueType), value)
+                                       
.castTo(ExpressionType.fromColumnTypeStrict(logicalType.getElementType()));
+
+      Indexed elements;
+      final int elementOffset;
+      switch (logicalType.getElementType().getType()) {
+        case STRING:
+          elements = stringDictionarySupplier.get();
+          elementOffset = 0;
+          break;
+        case LONG:
+          elements = longDictionarySupplier.get();
+          elementOffset = stringDictionarySupplier.get().size();
+          break;
+        case DOUBLE:
+          elements = doubleDictionarySupplier.get();
+          elementOffset = stringDictionarySupplier.get().size() + 
longDictionarySupplier.get().size();
+          break;
+        default:
+          throw DruidException.defensive(
+              "Unhandled array type [%s] how did this happen?",
+              logicalType.getElementType()
+          );
+      }
+
+      return new SimpleBitmapColumnIndex()
+      {
+        @Override
+        public double estimateSelectivity(int totalRows)
+        {
+          final int elementId = getElementId();
+          if (elementId < 0) {
+            return 0.0;
+          }
+          return (double) getElementBitmap(elementId).size() / totalRows;
+        }
+
+        @Override
+        public <T> T computeBitmapResult(BitmapResultFactory<T> 
bitmapResultFactory)
+        {
+          final int elementId = getElementId();
+
+          if (elementId < 0) {
+            return 
bitmapResultFactory.wrapDimensionValue(bitmapFactory.makeEmptyImmutableBitmap());
+          }
+          return 
bitmapResultFactory.wrapDimensionValue(getElementBitmap(elementId));
+        }
+
+        private int getElementId()
+        {
+          if (eval.value() == null) {
+            return 0;
+          } else if (eval.type().is(ExprType.STRING)) {
+            return 
elements.indexOf(StringUtils.toUtf8ByteBuffer(eval.asString()));
+          } else {
+            return elements.indexOf(eval.value()) + elementOffset;
+          }
+        }
+      };
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
index 72d5ed0ab3..e906816b04 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.nested;
 import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
+import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.common.config.NullHandling;
@@ -71,15 +72,12 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
   private FixedIndexedWriter<Double> doubleDictionaryWriter;
   private FrontCodedIntArrayIndexedWriter arrayDictionaryWriter;
   private FixedIndexedIntWriter arrayElementDictionaryWriter;
-  private int rowCount = 0;
   private boolean closedForWrite = false;
   private boolean dictionarySerialized = false;
-  private SingleValueColumnarIntsSerializer encodedValueSerializer;
-  private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
-  private GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter;
-  private MutableBitmap[] bitmaps;
+  private FixedIndexedIntWriter intermediateValueWriter;
+
   private ByteBuffer columnNameBytes = null;
-  private final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new 
Int2ObjectRBTreeMap<>();
+  private boolean hasNulls;
   @Nullable
   private final Byte variantTypeSetByte;
 
@@ -114,7 +112,7 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
   @Override
   public boolean hasNulls()
   {
-    return !bitmaps[0].isEmpty();
+    return hasNulls;
   }
 
   @Override
@@ -161,45 +159,8 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
     if (!dictionarySerialized) {
       throw new IllegalStateException("Dictionary not serialized, cannot open 
value serializer");
     }
-    String filenameBase = StringUtils.format("%s.forward_dim", name);
-    final int cardinality = dictionaryWriter.getCardinality()
-                            + longDictionaryWriter.getCardinality()
-                            + doubleDictionaryWriter.getCardinality()
-                            + arrayDictionaryWriter.getCardinality();
-    final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
-    final CompressionStrategy compressionToUse;
-    if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
-      compressionToUse = compression;
-    } else {
-      compressionToUse = CompressionStrategy.LZ4;
-    }
-    encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
-        name,
-        segmentWriteOutMedium,
-        filenameBase,
-        cardinality,
-        compressionToUse
-    );
-    encodedValueSerializer.open();
-
-    bitmapIndexWriter = new GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name,
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    bitmapIndexWriter.open();
-    bitmapIndexWriter.setObjectsNotSorted();
-    bitmaps = new MutableBitmap[cardinality];
-    for (int i = 0; i < bitmaps.length; i++) {
-      bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-    }
-    arrayElementIndexWriter = new GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name + "_arrays",
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    arrayElementIndexWriter.open();
-    arrayElementIndexWriter.setObjectsNotSorted();
+    intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium, 
false);
+    intermediateValueWriter.open();
   }
 
   @Override
@@ -211,7 +172,7 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
   ) throws IOException
   {
     if (dictionarySerialized) {
-      throw new ISE("String dictionary already serialized for column [%s], 
cannot serialize again", name);
+      throw new ISE("Value dictionaries already serialized for column [%s], 
cannot serialize again", name);
     }
 
     // null is always 0
@@ -277,14 +238,10 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
           globalIds[i] = -1;
         }
         Preconditions.checkArgument(globalIds[i] >= 0, "unknown global id [%s] 
for value [%s]", globalIds[i], array[i]);
-        arrayElements.computeIfAbsent(
-            globalIds[i],
-            (id) -> 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
-        ).add(rowCount);
       }
       final int dictId = dictionaryIdLookup.lookupArray(globalIds);
-      encodedValueSerializer.addValue(dictId);
-      bitmaps[dictId].add(rowCount);
+      intermediateValueWriter.write(dictId);
+      hasNulls = hasNulls || dictId == 0;
     } else {
       final Object o = eval.value();
       final int dictId;
@@ -300,43 +257,21 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
         dictId = -1;
       }
       Preconditions.checkArgument(dictId >= 0, "unknown global id [%s] for 
value [%s]", dictId, o);
-      if (dictId != 0) {
-        // treat as single element array
-        arrayElements.computeIfAbsent(
-            dictId,
-            (id) -> 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
-        ).add(rowCount);
-      }
-      encodedValueSerializer.addValue(dictId);
-      bitmaps[dictId].add(rowCount);
+      intermediateValueWriter.write(dictId);
+      hasNulls = hasNulls || dictId == 0;
     }
-
-    rowCount++;
   }
 
-  private void closeForWrite() throws IOException
+  private void closeForWrite()
   {
     if (!closedForWrite) {
-      for (int i = 0; i < bitmaps.length; i++) {
-        final MutableBitmap bitmap = bitmaps[i];
-        bitmapIndexWriter.write(
-            
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
-        );
-        bitmaps[i] = null; // Reclaim memory
-      }
-      for (Int2ObjectMap.Entry<MutableBitmap> arrayElement : 
arrayElements.int2ObjectEntrySet()) {
-        arrayElementDictionaryWriter.write(arrayElement.getIntKey());
-        arrayElementIndexWriter.write(
-            
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(arrayElement.getValue())
-        );
-      }
       columnNameBytes = computeFilenameBytes();
       closedForWrite = true;
     }
   }
 
   @Override
-  public long getSerializedSize() throws IOException
+  public long getSerializedSize()
   {
     closeForWrite();
 
@@ -357,6 +292,87 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
     Preconditions.checkState(closedForWrite, "Not closed yet!");
     Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not 
sorted?!?");
 
+    // write out compressed dictionaryId int column, bitmap indexes, and array 
element bitmap indexes
+    // by iterating intermediate value column the intermediate value column 
should be replaced someday by a cooler
+    // compressed int column writer that allows easy iteration of the values 
it writes out, so that we could just
+    // build the bitmap indexes here instead of doing both things
+    String filenameBase = StringUtils.format("%s.forward_dim", name);
+    final int cardinality = dictionaryWriter.getCardinality()
+                            + longDictionaryWriter.getCardinality()
+                            + doubleDictionaryWriter.getCardinality()
+                            + arrayDictionaryWriter.getCardinality();
+    final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
+    final CompressionStrategy compressionToUse;
+    if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
+      compressionToUse = compression;
+    } else {
+      compressionToUse = CompressionStrategy.LZ4;
+    }
+
+    final SingleValueColumnarIntsSerializer encodedValueSerializer = 
CompressedVSizeColumnarIntsSerializer.create(
+        name,
+        segmentWriteOutMedium,
+        filenameBase,
+        cardinality,
+        compressionToUse
+    );
+    encodedValueSerializer.open();
+
+    final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new 
GenericIndexedWriter<>(
+        segmentWriteOutMedium,
+        name,
+        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+    );
+    bitmapIndexWriter.open();
+    bitmapIndexWriter.setObjectsNotSorted();
+    final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+    final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new 
Int2ObjectRBTreeMap<>();
+    for (int i = 0; i < bitmaps.length; i++) {
+      bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+    }
+    final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new 
GenericIndexedWriter<>(
+        segmentWriteOutMedium,
+        name + "_arrays",
+        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+    );
+    arrayElementIndexWriter.open();
+    arrayElementIndexWriter.setObjectsNotSorted();
+
+    final IntIterator rows = intermediateValueWriter.getIterator();
+    int rowCount = 0;
+    final int arrayBaseId = dictionaryWriter.getCardinality()
+                            + longDictionaryWriter.getCardinality()
+                            + doubleDictionaryWriter.getCardinality();
+    while (rows.hasNext()) {
+      final int dictId = rows.nextInt();
+      encodedValueSerializer.addValue(dictId);
+      bitmaps[dictId].add(rowCount);
+      if (dictId >= arrayBaseId) {
+        int[] array = arrayDictionaryWriter.get(dictId - arrayBaseId);
+        for (int elementId : array) {
+          arrayElements.computeIfAbsent(
+              elementId,
+              (id) -> 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
+          ).add(rowCount);
+        }
+      }
+      rowCount++;
+    }
+
+    for (int i = 0; i < bitmaps.length; i++) {
+      final MutableBitmap bitmap = bitmaps[i];
+      bitmapIndexWriter.write(
+          
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+      );
+      bitmaps[i] = null; // Reclaim memory
+    }
+    for (Int2ObjectMap.Entry<MutableBitmap> arrayElement : 
arrayElements.int2ObjectEntrySet()) {
+      arrayElementDictionaryWriter.write(arrayElement.getIntKey());
+      arrayElementIndexWriter.write(
+          
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(arrayElement.getValue())
+      );
+    }
+
     writeV0Header(channel, columnNameBytes);
     if (variantTypeSetByte != null) {
       channel.write(ByteBuffer.wrap(new byte[]{variantTypeSetByte}));
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
index 383a2046db..ac7bd1c684 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
@@ -47,6 +47,7 @@ import 
org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
 import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
 import org.apache.druid.segment.index.semantic.NullValueIndex;
 import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueIndexes;
 import org.apache.druid.segment.vector.NoFilterVectorOffset;
 import org.apache.druid.segment.vector.VectorValueSelector;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -248,6 +249,7 @@ public class ScalarDoubleColumnSupplierTest extends 
InitializedNullHandlingTest
     ColumnValueSelector<?> valueSelector = 
column.makeColumnValueSelector(offset);
     VectorValueSelector vectorValueSelector = 
column.makeVectorValueSelector(vectorOffset);
 
+    ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
     StringValueSetIndexes valueSetIndex = 
supplier.as(StringValueSetIndexes.class);
     DruidPredicateIndexes predicateIndex = 
supplier.as(DruidPredicateIndexes.class);
     NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
@@ -279,6 +281,7 @@ public class ScalarDoubleColumnSupplierTest extends 
InitializedNullHandlingTest
         }
 
         
Assert.assertTrue(valueSetIndex.forValue(String.valueOf(row)).computeBitmapResult(resultFactory).get(i));
+        Assert.assertTrue(valueIndexes.forValue(row, 
ColumnType.DOUBLE).computeBitmapResult(resultFactory).get(i));
         Assert.assertTrue(valueSetIndex.forSortedValues(new 
TreeSet<>(ImmutableSet.of(String.valueOf(row))))
                                        .computeBitmapResult(resultFactory)
                                        .get(i));
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
index 87c4fcd830..ebd27fa794 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
@@ -47,6 +47,7 @@ import 
org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
 import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
 import org.apache.druid.segment.index.semantic.NullValueIndex;
 import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueIndexes;
 import org.apache.druid.segment.vector.NoFilterVectorOffset;
 import org.apache.druid.segment.vector.VectorValueSelector;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -248,6 +249,7 @@ public class ScalarLongColumnSupplierTest extends 
InitializedNullHandlingTest
     ColumnValueSelector<?> valueSelector = 
column.makeColumnValueSelector(offset);
     VectorValueSelector vectorValueSelector = 
column.makeVectorValueSelector(vectorOffset);
 
+    ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
     StringValueSetIndexes valueSetIndex = 
supplier.as(StringValueSetIndexes.class);
     DruidPredicateIndexes predicateIndex = 
supplier.as(DruidPredicateIndexes.class);
     NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
@@ -279,6 +281,7 @@ public class ScalarLongColumnSupplierTest extends 
InitializedNullHandlingTest
         }
 
         
Assert.assertTrue(valueSetIndex.forValue(String.valueOf(row)).computeBitmapResult(resultFactory).get(i));
+        Assert.assertTrue(valueIndexes.forValue(row, 
ColumnType.LONG).computeBitmapResult(resultFactory).get(i));
         Assert.assertTrue(valueSetIndex.forSortedValues(new 
TreeSet<>(ImmutableSet.of(String.valueOf(row))))
                                        .computeBitmapResult(resultFactory)
                                        .get(i));
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
index da5edc734e..8b48534ac5 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
@@ -49,6 +49,7 @@ import 
org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
 import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
 import org.apache.druid.segment.index.semantic.NullValueIndex;
 import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueIndexes;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -248,6 +249,7 @@ public class ScalarStringColumnSupplierTest extends 
InitializedNullHandlingTest
     ColumnValueSelector<?> valueSelector = 
column.makeColumnValueSelector(offset);
     DimensionSelector dimSelector = column.makeDimensionSelector(offset, null);
 
+    ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
     StringValueSetIndexes valueSetIndex = 
supplier.as(StringValueSetIndexes.class);
     DruidPredicateIndexes predicateIndex = 
supplier.as(DruidPredicateIndexes.class);
     NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
@@ -270,6 +272,7 @@ public class ScalarStringColumnSupplierTest extends 
InitializedNullHandlingTest
         
Assert.assertEquals(dimSelector.idLookup().lookupId(dimSelectorLookupVal), 
dimSelector.getRow().get(0));
 
         
Assert.assertTrue(valueSetIndex.forValue(row).computeBitmapResult(resultFactory).get(i));
+        Assert.assertTrue(valueIndexes.forValue(row, 
ColumnType.STRING).computeBitmapResult(resultFactory).get(i));
         Assert.assertTrue(valueSetIndex.forSortedValues(new 
TreeSet<>(ImmutableSet.of(row)))
                                        .computeBitmapResult(resultFactory)
                                        .get(i));
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
index 69fe1dc9bf..401e5d87eb 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
@@ -47,9 +47,11 @@ import org.apache.druid.segment.data.BitmapSerdeFactory;
 import org.apache.druid.segment.data.CompressionFactory;
 import org.apache.druid.segment.data.FrontCodedIndexed;
 import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
+import org.apache.druid.segment.index.semantic.ArrayElementIndexes;
 import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
 import org.apache.druid.segment.index.semantic.NullValueIndex;
 import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueIndexes;
 import org.apache.druid.segment.vector.NoFilterVectorOffset;
 import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
 import org.apache.druid.segment.vector.VectorObjectSelector;
@@ -377,6 +379,15 @@ public class VariantColumnSupplierTest extends 
InitializedNullHandlingTest
     Assert.assertNull(predicateIndex);
     NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
     Assert.assertNotNull(nullValueIndex);
+    ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
+    ArrayElementIndexes arrayElementIndexes = 
supplier.as(ArrayElementIndexes.class);
+    if (expectedType.getSingleType() != null && 
expectedType.getSingleType().isArray()) {
+      Assert.assertNotNull(valueIndexes);
+      Assert.assertNotNull(arrayElementIndexes);
+    } else {
+      Assert.assertNull(valueIndexes);
+      Assert.assertNull(arrayElementIndexes);
+    }
 
     SortedMap<String, FieldTypeInfo.MutableTypeSet> fields = 
column.getFieldTypeInfo();
     Assert.assertEquals(1, fields.size());
@@ -397,6 +408,10 @@ public class VariantColumnSupplierTest extends 
InitializedNullHandlingTest
           Assert.assertArrayEquals(((List) row).toArray(), (Object[]) 
valueSelector.getObject());
           if (expectedType.getSingleType() != null) {
             Assert.assertArrayEquals(((List) row).toArray(), (Object[]) 
vectorObjectSelector.getObjectVector()[0]);
+            Assert.assertTrue(valueIndexes.forValue(row, 
expectedType.getSingleType()).computeBitmapResult(resultFactory).get(i));
+            for (Object o : ((List) row)) {
+              Assert.assertTrue("Failed on row: " + row, 
arrayElementIndexes.containsValue(o, 
expectedType.getSingleType().getElementType()).computeBitmapResult(resultFactory).get(i));
+            }
           } else {
             // mixed type vector object selector coerces to the most common 
type
             Assert.assertArrayEquals(ExprEval.ofType(expressionType, 
row).asArray(), (Object[]) vectorObjectSelector.getObjectVector()[0]);
@@ -440,6 +455,9 @@ public class VariantColumnSupplierTest extends 
InitializedNullHandlingTest
           }
         }
         
Assert.assertTrue(nullValueIndex.get().computeBitmapResult(resultFactory).get(i));
+        if (expectedType.getSingleType() != null) {
+          Assert.assertFalse(arrayElementIndexes.containsValue(null, 
expectedType.getSingleType()).computeBitmapResult(resultFactory).get(i));
+        }
       }
 
       offset.increment();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to