This is an automated email from the ASF dual-hosted git repository.

adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2726c6f3882 Minor refactors to processing
2726c6f3882 is described below

commit 2726c6f38823d42f7f82d3b4e8742f7a80fff19f
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu Nov 21 15:37:55 2024 +0530

    Minor refactors to processing
    
    Some refactors across druid to clean up the code and add utility functions 
where required.
---
 .../org/apache/druid/msq/test/MSQTestBase.java     |   3 +-
 .../druid/query/aggregation/SerializedStorage.java | 195 ++++++++----
 .../java/org/apache/druid/segment/IndexIO.java     |   3 +-
 .../java/org/apache/druid/segment/IndexSpec.java   |   2 +-
 .../column/StringUtf8DictionaryEncodedColumn.java  |  11 +-
 .../data/BlockLayoutColumnarDoublesSupplier.java   |  12 +-
 .../data/BlockLayoutColumnarLongsSupplier.java     |  15 +-
 .../data/CompressedVSizeColumnarIntsSupplier.java  |   7 +-
 .../druid/segment/data/CompressionFactory.java     |   1 -
 .../nested/NestedCommonFormatColumnSerializer.java |   2 +-
 .../segment/nested/NestedDataComplexTypeSerde.java |  86 +++--
 .../druid/segment/nested/ScalarDoubleColumn.java   |  27 +-
 .../nested/ScalarDoubleColumnAndIndexSupplier.java |  21 +-
 .../druid/segment/nested/ScalarLongColumn.java     |  27 +-
 .../nested/ScalarLongColumnAndIndexSupplier.java   |  21 +-
 .../nested/ScalarStringColumnAndIndexSupplier.java |   9 +-
 .../apache/druid/segment/nested/VariantColumn.java |   6 +-
 .../nested/VariantColumnAndIndexSupplier.java      |   3 +-
 .../segment/nested/VariantColumnSerializer.java    | 349 +++++++++++++--------
 .../serde/DictionaryEncodedColumnPartSerde.java    |   3 +-
 .../StringUtf8DictionaryEncodedColumnSupplier.java |  15 +-
 .../druid/segment/serde/cell/IOIterator.java       |   4 +-
 .../druid/segment/serde/cell/StagedSerde.java      |  35 +++
 .../filter/PredicateValueMatcherFactoryTest.java   |   7 +-
 .../druid/segment/filter/ValueMatchersTest.java    |  11 +-
 25 files changed, 621 insertions(+), 254 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index e99b571b807..060f8499e3e 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -511,7 +511,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
         binder -> 
binder.bind(SegmentManager.class).toInstance(EasyMock.createMock(SegmentManager.class)),
         new JoinableFactoryModule(),
         new IndexingServiceTuningConfigModule(),
-        new MSQIndexingModule(),
         Modules.override(new MSQSqlModule()).with(
             binder -> {
               // Our Guice configuration currently requires bindings to exist 
even if they aren't ever used, the
@@ -540,6 +539,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
 
     objectMapper = setupObjectMapper(injector);
     objectMapper.registerModules(new 
StorageConnectorModule().getJacksonModules());
+    objectMapper.registerModules(new MSQIndexingModule().getJacksonModules());
     objectMapper.registerModules(sqlModule.getJacksonModules());
     objectMapper.registerModules(BuiltInTypesModule.getJacksonModulesList());
 
@@ -697,7 +697,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
           break;
         default:
           throw new ISE("Cannot query segment %s in test runner", segmentId);
-
       }
       Segment segment = new Segment()
       {
diff --git 
a/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
 
b/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
index a6fee46b3ca..29d03d14615 100644
--- 
a/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
+++ 
b/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
@@ -19,18 +19,17 @@
 
 package org.apache.druid.query.aggregation;
 
-import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.segment.serde.cell.IOIterator;
-import org.apache.druid.segment.serde.cell.IntSerializer;
 import org.apache.druid.segment.serde.cell.StagedSerde;
 import org.apache.druid.segment.writeout.WriteOutBytes;
 
 import javax.annotation.Nullable;
-import java.io.BufferedInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.nio.IntBuffer;
 import java.util.NoSuchElementException;
 
 /**
@@ -45,109 +44,181 @@ public class SerializedStorage<T>
 {
   private final WriteOutBytes writeOutBytes;
   private final StagedSerde<T> serde;
-  private final IntSerializer intSerializer = new IntSerializer();
+  private final ByteBuffer itemOffsetsBytes;
+  private final IntBuffer itemSizes;
+
+  private final LongArrayList rowChunkOffsets = new LongArrayList();
+  private int numStored = 0;
+  private int maxSize = 0;
 
   public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde)
+  {
+    this(writeOutBytes, serde, 4096);
+  }
+
+  public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde, 
int chunkSize)
   {
     this.writeOutBytes = writeOutBytes;
     this.serde = serde;
+
+    this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize * 
Integer.BYTES).order(ByteOrder.nativeOrder());
+    this.itemSizes = itemOffsetsBytes.asIntBuffer();
   }
 
   public void store(@Nullable T value) throws IOException
   {
     byte[] bytes = serde.serialize(value);
 
-    writeOutBytes.write(intSerializer.serialize(bytes.length));
-    writeOutBytes.write(bytes);
+    maxSize = Math.max(maxSize, bytes.length);
+    itemSizes.put(bytes.length);
+    if (bytes.length > 0) {
+      writeOutBytes.write(bytes);
+    }
+
+    ++numStored;
+    if (itemSizes.remaining() == 0) {
+      rowChunkOffsets.add(writeOutBytes.size());
+      writeOutBytes.write(itemOffsetsBytes);
+      itemOffsetsBytes.clear();
+      itemSizes.clear();
+    }
   }
 
+  public int numStored()
+  {
+    return numStored;
+  }
+
+  /**
+   * Generates an iterator over everything that has been stored.  Also 
signifies the end of storing objects.
+   * iterator() can be called multiple times if needed, but after iterator() 
is called, store() can no longer be
+   * called.
+   *
+   * @return an iterator
+   * @throws IOException on failure
+   */
   public IOIterator<T> iterator() throws IOException
   {
-    return new DeserializingIOIterator<>(writeOutBytes.asInputStream(), serde);
+    if (itemSizes.position() != itemSizes.limit()) {
+      rowChunkOffsets.add(writeOutBytes.size());
+      itemOffsetsBytes.limit(itemSizes.position() * Integer.BYTES);
+      writeOutBytes.write(itemOffsetsBytes);
+
+      // Move the limit to the position so that we fail subsequent writes and 
indicate that we are done
+      itemSizes.limit(itemSizes.position());
+    }
+
+    return new DeserializingIOIterator<>(
+        writeOutBytes,
+        rowChunkOffsets,
+        numStored,
+        itemSizes.capacity(),
+        maxSize,
+        serde
+    );
   }
 
   private static class DeserializingIOIterator<T> implements IOIterator<T>
   {
-    private static final int NEEDS_READ = -2;
-    private static final int EOF = -1;
+    private static final ByteBuffer EMPTY_BUFFER = 
ByteBuffer.allocate(0).asReadOnlyBuffer();
 
-    private final byte[] intBytes;
-    private final BufferedInputStream inputStream;
+    private final WriteOutBytes medium;
+    private final LongArrayList rowChunkOffsets;
+    private final int numEntries;
+    private ByteBuffer tmpBuf;
     private final StagedSerde<T> serde;
 
-    private int nextSize;
-
-    public DeserializingIOIterator(InputStream inputStream, StagedSerde<T> 
serde)
+    private final ByteBuffer itemOffsetsBytes;
+    private final int[] itemSizes;
+
+    private long itemStartOffset;
+    private int chunkId = 0;
+    private int currId = 0;
+    private int itemIndex;
+
+    public DeserializingIOIterator(
+        WriteOutBytes medium,
+        LongArrayList rowChunkOffsets,
+        int numEntries,
+        int chunkSize,
+        int maxSize,
+        StagedSerde<T> serde
+    )
     {
-      this.inputStream = new BufferedInputStream(inputStream);
+      this.medium = medium;
+      this.rowChunkOffsets = rowChunkOffsets;
+      this.numEntries = numEntries;
+      this.tmpBuf = 
ByteBuffer.allocate(maxSize).order(ByteOrder.nativeOrder());
       this.serde = serde;
-      intBytes = new byte[Integer.BYTES];
-      nextSize = NEEDS_READ;
+
+      this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize * 
Integer.BYTES).order(ByteOrder.nativeOrder());
+      this.itemSizes = new int[chunkSize];
+      this.itemIndex = chunkSize;
     }
 
     @Override
-    public boolean hasNext() throws IOException
+    public boolean hasNext()
     {
-      return getNextSize() > EOF;
+      return currId < numEntries;
     }
 
     @Override
     public T next() throws IOException
     {
-      int currentNextSize = getNextSize();
-
-      if (currentNextSize == -1) {
-        throw new NoSuchElementException("end of buffer reached");
+      if (currId >= numEntries) {
+        throw new NoSuchElementException();
       }
 
-      byte[] nextBytes = new byte[currentNextSize];
-      int bytesRead = 0;
-
-      while (bytesRead < currentNextSize) {
-        int result = inputStream.read(nextBytes, bytesRead, currentNextSize - 
bytesRead);
-
-        if (result == -1) {
-          throw new NoSuchElementException("unexpected end of buffer reached");
+      if (itemIndex >= itemSizes.length) {
+        if (chunkId == 0) {
+          itemStartOffset = 0;
+        } else {
+          if (itemStartOffset != rowChunkOffsets.getLong(chunkId - 1)) {
+            throw DruidException.defensive(
+                "Should have read up to the start of the offsets [%,d], "
+                + "but for some reason the values [%,d] don't align.  Possible 
corruption?",
+                rowChunkOffsets.getLong(chunkId - 1),
+                itemStartOffset
+            );
+          }
+          itemStartOffset += (((long) itemSizes.length) * Integer.BYTES);
         }
 
-        bytesRead += result;
-      }
-
-      Preconditions.checkState(bytesRead == currentNextSize);
-      T value = serde.deserialize(nextBytes);
-
-      nextSize = NEEDS_READ;
-
-      return value;
-    }
-
-    private int getNextSize() throws IOException
-    {
-      if (nextSize == NEEDS_READ) {
-        int bytesRead = 0;
-
-        while (bytesRead < Integer.BYTES) {
-          int result = inputStream.read(intBytes, bytesRead, Integer.BYTES - 
bytesRead);
+        int numToRead = Math.min(itemSizes.length, numEntries - (chunkId * 
itemSizes.length));
+        final long readOffset = rowChunkOffsets.getLong(chunkId++);
+        itemOffsetsBytes.clear();
+        itemOffsetsBytes.limit(numToRead * Integer.BYTES);
+        medium.readFully(readOffset, itemOffsetsBytes);
+        itemOffsetsBytes.flip();
+        itemOffsetsBytes.asIntBuffer().get(itemSizes, 0, numToRead);
 
-          if (result == -1) {
-            nextSize = EOF;
-            return EOF;
-          } else {
-            bytesRead += result;
-          }
-        }
-        Preconditions.checkState(bytesRead == Integer.BYTES);
+        itemIndex = 0;
+      }
 
-        nextSize = 
ByteBuffer.wrap(intBytes).order(ByteOrder.nativeOrder()).getInt();
+      int bytesToRead = itemSizes[itemIndex];
+      final T retVal;
+      if (bytesToRead == 0) {
+        retVal = serde.deserialize(EMPTY_BUFFER);
+      } else {
+        tmpBuf.clear();
+        tmpBuf.limit(bytesToRead);
+        medium.readFully(itemStartOffset, tmpBuf);
+        tmpBuf.flip();
+
+        retVal = serde.deserialize(tmpBuf);
       }
 
-      return nextSize;
+      itemStartOffset += bytesToRead;
+      ++itemIndex;
+      ++currId;
+
+      return retVal;
     }
 
     @Override
-    public void close() throws IOException
+    public void close()
     {
-      inputStream.close();
+
     }
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java 
b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index 8470c63f3e7..a6ddad61406 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -456,7 +456,8 @@ public class IndexIO
                 new StringUtf8DictionaryEncodedColumnSupplier<>(
                     index.getDimValueUtf8Lookup(dimension)::singleThreaded,
                     null,
-                    Suppliers.ofInstance(index.getDimColumn(dimension))
+                    Suppliers.ofInstance(index.getDimColumn(dimension)),
+                    LEGACY_FACTORY.getBitmapFactory()
                 )
             );
         GenericIndexed<ImmutableBitmap> bitmaps = 
index.getBitmapIndexes().get(dimension);
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java 
b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
index 37a7e6cc0d6..1dc2f849682 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
@@ -43,7 +43,7 @@ import java.util.Objects;
  */
 public class IndexSpec
 {
-  public static IndexSpec DEFAULT = IndexSpec.builder().build();
+  public static final IndexSpec DEFAULT = IndexSpec.builder().build();
 
   public static Builder builder()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
index bed58a43675..59da8ffcb0a 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.column;
 
 import com.google.common.collect.Lists;
+import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.common.semantic.SemanticUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.extraction.ExtractionFn;
@@ -73,16 +74,19 @@ public class StringUtf8DictionaryEncodedColumn implements 
DictionaryEncodedColum
   @Nullable
   private final ColumnarMultiInts multiValueColumn;
   private final Indexed<ByteBuffer> utf8Dictionary;
+  private final BitmapFactory bitmapFactory;
 
   public StringUtf8DictionaryEncodedColumn(
       @Nullable ColumnarInts singleValueColumn,
       @Nullable ColumnarMultiInts multiValueColumn,
-      Indexed<ByteBuffer> utf8Dictionary
+      Indexed<ByteBuffer> utf8Dictionary,
+      BitmapFactory bitmapFactory
   )
   {
     this.column = singleValueColumn;
     this.multiValueColumn = multiValueColumn;
     this.utf8Dictionary = utf8Dictionary;
+    this.bitmapFactory = bitmapFactory;
   }
 
   @Override
@@ -135,6 +139,11 @@ public class StringUtf8DictionaryEncodedColumn implements 
DictionaryEncodedColum
     return utf8Dictionary.size();
   }
 
+  public BitmapFactory getBitmapFactory()
+  {
+    return bitmapFactory;
+  }
+
   @Override
   public HistoricalDimensionSelector makeDimensionSelector(
       final ReadableOffset offset,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
index 4473fa77d46..010e0b69857 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java
@@ -36,6 +36,7 @@ public class BlockLayoutColumnarDoublesSupplier implements 
Supplier<ColumnarDoub
 
   // The number of doubles per buffer.
   private final int sizePer;
+  private final CompressionStrategy strategy;
 
   public BlockLayoutColumnarDoublesSupplier(
       int totalSize,
@@ -45,7 +46,8 @@ public class BlockLayoutColumnarDoublesSupplier implements 
Supplier<ColumnarDoub
       CompressionStrategy strategy
   )
   {
-    baseDoubleBuffers = GenericIndexed.read(fromBuffer, 
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
+    this.strategy = strategy;
+    this.baseDoubleBuffers = GenericIndexed.read(fromBuffer, 
DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
     this.totalSize = totalSize;
     this.sizePer = sizePer;
   }
@@ -78,7 +80,8 @@ public class BlockLayoutColumnarDoublesSupplier implements 
Supplier<ColumnarDoub
     }
   }
 
-  private class BlockLayoutColumnarDoubles implements ColumnarDoubles
+  // This needs to be a public class so that SemanticCreator is able to call 
it.
+  public class BlockLayoutColumnarDoubles implements ColumnarDoubles
   {
     final Indexed<ResourceHolder<ByteBuffer>> singleThreadedDoubleBuffers = 
baseDoubleBuffers.singleThreaded();
 
@@ -91,6 +94,11 @@ public class BlockLayoutColumnarDoublesSupplier implements 
Supplier<ColumnarDoub
     @Nullable
     DoubleBuffer doubleBuffer;
 
+    public CompressionStrategy getCompressionStrategy()
+    {
+      return strategy;
+    }
+
     @Override
     public int size()
     {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
index 2b46d9aa6e2..aa0346c6e34 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSupplier.java
@@ -43,6 +43,7 @@ public class BlockLayoutColumnarLongsSupplier implements 
Supplier<ColumnarLongs>
   // The number of longs per buffer.
   private final int sizePer;
   private final CompressionFactory.LongEncodingReader baseReader;
+  private final CompressionStrategy strategy;
 
   public BlockLayoutColumnarLongsSupplier(
       int totalSize,
@@ -53,6 +54,7 @@ public class BlockLayoutColumnarLongsSupplier implements 
Supplier<ColumnarLongs>
       CompressionStrategy strategy
   )
   {
+    this.strategy = strategy;
     this.baseLongBuffers = GenericIndexed.read(fromBuffer, 
DecompressingByteBufferObjectStrategy.of(order, strategy));
     this.totalSize = totalSize;
     this.sizePer = sizePer;
@@ -124,7 +126,8 @@ public class BlockLayoutColumnarLongsSupplier implements 
Supplier<ColumnarLongs>
     }
   }
 
-  private class BlockLayoutColumnarLongs implements ColumnarLongs
+  // This needs to be a public class so that SemanticCreator is able to call 
it.
+  public class BlockLayoutColumnarLongs implements ColumnarLongs
   {
     final CompressionFactory.LongEncodingReader reader = 
baseReader.duplicate();
     final Indexed<ResourceHolder<ByteBuffer>> singleThreadedLongBuffers = 
baseLongBuffers.singleThreaded();
@@ -140,6 +143,16 @@ public class BlockLayoutColumnarLongsSupplier implements 
Supplier<ColumnarLongs>
     @Nullable
     LongBuffer longBuffer;
 
+    public CompressionFactory.LongEncodingStrategy getEncodingStrategy()
+    {
+      return baseReader.getStrategy();
+    }
+
+    public CompressionStrategy getCompressionStrategy()
+    {
+      return strategy;
+    }
+
     @Override
     public int size()
     {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
index eaf0b2a47a9..dfa6667bf01 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
@@ -308,7 +308,7 @@ public class CompressedVSizeColumnarIntsSupplier implements 
WritableSupplier<Col
     }
   }
 
-  private class CompressedVSizeColumnarInts implements ColumnarInts
+  public class CompressedVSizeColumnarInts implements ColumnarInts
   {
     final Indexed<ResourceHolder<ByteBuffer>> singleThreadedBuffers = 
baseBuffers.singleThreaded();
 
@@ -329,6 +329,11 @@ public class CompressedVSizeColumnarIntsSupplier 
implements WritableSupplier<Col
       return totalSize;
     }
 
+    public CompressionStrategy getCompressionStrategy()
+    {
+      return compression;
+    }
+
     /**
      * Returns the value at the given index into the column.
      * <p/>
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
index 91ec70b7f17..9e7d2ea5b61 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
@@ -303,7 +303,6 @@ public class CompressionFactory
      */
     LongEncodingReader duplicate();
 
-    @SuppressWarnings("unused")
     LongEncodingStrategy getStrategy();
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
index 54653f286a9..88b5c240e1b 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
@@ -79,7 +79,7 @@ public abstract class NestedCommonFormatColumnSerializer 
implements GenericColum
     ColumnSerializerUtils.writeInternal(smoosher, serializer, getColumnName(), 
fileName);
   }
 
-  protected void copyFromTempSmoosh(FileSmoosher smoosher, SmooshedFileMapper 
fileMapper) throws IOException
+  protected static void copyFromTempSmoosh(FileSmoosher smoosher, 
SmooshedFileMapper fileMapper) throws IOException
   {
     for (String internalName : fileMapper.getInternalFilenames()) {
       smoosher.add(internalName, fileMapper.mapFile(internalName));
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java
index cb11f62bb0f..3236b42402c 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java
@@ -22,7 +22,7 @@ package org.apache.druid.segment.nested;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import it.unimi.dsi.fastutil.Hash;
 import org.apache.druid.data.input.impl.DimensionSchema;
-import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.segment.DimensionHandler;
 import org.apache.druid.segment.NestedDataColumnHandlerV4;
@@ -117,29 +117,14 @@ public class NestedDataComplexTypeSerde extends 
ComplexMetricSerde
       @Override
       public Object fromByteBuffer(ByteBuffer buffer, int numBytes)
       {
-        final byte[] bytes = new byte[numBytes];
-        buffer.get(bytes, 0, numBytes);
-        try {
-          return ColumnSerializerUtils.SMILE_MAPPER.readValue(bytes, 
StructuredData.class);
-        }
-        catch (IOException e) {
-          throw new ISE(e, "Unable to deserialize value");
-        }
+        return deserializeBuffer(buffer, numBytes);
       }
 
       @Nullable
       @Override
       public byte[] toBytes(@Nullable Object val)
       {
-        if (val == null) {
-          return new byte[0];
-        }
-        try {
-          return ColumnSerializerUtils.SMILE_MAPPER.writeValueAsBytes(val);
-        }
-        catch (JsonProcessingException e) {
-          throw new ISE(e, "Unable to serialize value [%s]", val);
-        }
+        return serializeToBytes(val);
       }
 
       @Override
@@ -150,6 +135,71 @@ public class NestedDataComplexTypeSerde extends 
ComplexMetricSerde
     };
   }
 
+  /**
+   * Reads numBytes from the position to the limit of the byte buffer argument 
and deserailizes it into
+   * a {@link StructuredData} object using {@link 
ColumnSerializerUtils#SMILE_MAPPER}.
+   */
+  public static StructuredData deserializeBuffer(ByteBuffer buf)
+  {
+    return deserializeBuffer(buf, buf.remaining());
+  }
+
+  /**
+   * Reads numBytes from the byte buffer argument and deserailizes it into a 
{@link StructuredData} object
+   * using {@link ColumnSerializerUtils#SMILE_MAPPER}.
+   */
+  public static StructuredData deserializeBuffer(ByteBuffer buf, int numBytes)
+  {
+    if (numBytes == 0) {
+      return null;
+    }
+
+    final byte[] bytes = new byte[numBytes];
+    buf.get(bytes, 0, numBytes);
+    return deserializeBytes(bytes);
+  }
+
+  /**
+   * Converts the bytes array into a {@link StructuredData} object using 
{@link ColumnSerializerUtils#SMILE_MAPPER}.
+   */
+  public static StructuredData deserializeBytes(byte[] bytes)
+  {
+    return deserializeBytes(bytes, 0, bytes.length);
+  }
+
+  /**
+   * Reads the bytes between offset and len from the byte array and 
deserializes a {@link StructuredData} object from
+   * it, using {@link ColumnSerializerUtils#SMILE_MAPPER}.
+   */
+  public static StructuredData deserializeBytes(byte[] bytes, int offset, int 
len)
+  {
+    if (len == 0) {
+      return null;
+    }
+    try {
+      return ColumnSerializerUtils.SMILE_MAPPER.readValue(bytes, offset, len, 
StructuredData.class);
+    }
+    catch (IOException e) {
+      throw DruidException.defensive(e, "Unable to deserialize value");
+    }
+  }
+
+  /**
+   * Returns a byte array containing the val as serialized by {@link 
ColumnSerializerUtils#SMILE_MAPPER}.
+   */
+  public static byte[] serializeToBytes(@Nullable Object val)
+  {
+    if (val == null) {
+      return new byte[0];
+    }
+    try {
+      return ColumnSerializerUtils.SMILE_MAPPER.writeValueAsBytes(val);
+    }
+    catch (JsonProcessingException e) {
+      throw DruidException.defensive(e, "Unable to serialize value [%s]", val);
+    }
+  }
+
   @Override
   public <T extends Comparable<T>> TypeStrategy<T> getTypeStrategy()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
index 6ade80b6e83..e116a70d28d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.nested;
 
+import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.common.semantic.SemanticUtils;
@@ -27,6 +28,7 @@ import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DoubleColumnSelector;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.data.ColumnarDoubles;
+import org.apache.druid.segment.data.ColumnarInts;
 import org.apache.druid.segment.data.FixedIndexed;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.ReadableOffset;
@@ -40,6 +42,7 @@ import org.roaringbitmap.PeekableIntIterator;
 import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 /**
  * {@link NestedCommonFormatColumn} for {@link ColumnType#DOUBLE}
@@ -50,18 +53,24 @@ public class ScalarDoubleColumn implements 
NestedCommonFormatColumn
       SemanticUtils.makeAsMap(ScalarDoubleColumn.class);
 
   private final FixedIndexed<Double> doubleDictionary;
+  private final Supplier<ColumnarInts> encodedValuesSupplier;
   private final ColumnarDoubles valueColumn;
-  private final ImmutableBitmap nullValueIndex;
+  private final ImmutableBitmap nullValueBitmap;
+  private final BitmapFactory bitmapFactory;
 
   public ScalarDoubleColumn(
       FixedIndexed<Double> doubleDictionary,
+      Supplier<ColumnarInts> encodedValuesSupplier,
       ColumnarDoubles valueColumn,
-      ImmutableBitmap nullValueIndex
+      ImmutableBitmap nullValueBitmap,
+      BitmapFactory bitmapFactory
   )
   {
     this.doubleDictionary = doubleDictionary;
+    this.encodedValuesSupplier = encodedValuesSupplier;
     this.valueColumn = valueColumn;
-    this.nullValueIndex = nullValueIndex;
+    this.nullValueBitmap = nullValueBitmap;
+    this.bitmapFactory = bitmapFactory;
   }
 
   @Override
@@ -81,7 +90,7 @@ public class ScalarDoubleColumn implements 
NestedCommonFormatColumn
   {
     return new DoubleColumnSelector()
     {
-      private PeekableIntIterator nullIterator = 
nullValueIndex.peekableIterator();
+      private PeekableIntIterator nullIterator = 
nullValueBitmap.peekableIterator();
       private int nullMark = -1;
       private int offsetMark = -1;
 
@@ -95,7 +104,7 @@ public class ScalarDoubleColumn implements 
NestedCommonFormatColumn
       public void inspectRuntimeShape(RuntimeShapeInspector inspector)
       {
         inspector.visit("doubleColumn", valueColumn);
-        inspector.visit("nullBitmap", nullValueIndex);
+        inspector.visit("nullBitmap", nullValueBitmap);
       }
 
       @Override
@@ -108,7 +117,7 @@ public class ScalarDoubleColumn implements 
NestedCommonFormatColumn
         if (i < offsetMark) {
           // offset was reset, reset iterator state
           nullMark = -1;
-          nullIterator = nullValueIndex.peekableIterator();
+          nullIterator = nullValueBitmap.peekableIterator();
         }
         offsetMark = i;
         if (nullMark < i) {
@@ -133,7 +142,7 @@ public class ScalarDoubleColumn implements 
NestedCommonFormatColumn
       private int id = ReadableVectorInspector.NULL_ID;
 
       @Nullable
-      private PeekableIntIterator nullIterator = nullValueIndex != null ? 
nullValueIndex.peekableIterator() : null;
+      private PeekableIntIterator nullIterator = nullValueBitmap != null ? 
nullValueBitmap.peekableIterator() : null;
       private int offsetMark = -1;
 
       @Override
@@ -162,14 +171,14 @@ public class ScalarDoubleColumn implements 
NestedCommonFormatColumn
 
         if (offset.isContiguous()) {
           if (offset.getStartOffset() < offsetMark) {
-            nullIterator = nullValueIndex.peekableIterator();
+            nullIterator = nullValueBitmap.peekableIterator();
           }
           offsetMark = offset.getStartOffset() + offset.getCurrentVectorSize();
           valueColumn.get(valueVector, offset.getStartOffset(), 
offset.getCurrentVectorSize());
         } else {
           final int[] offsets = offset.getOffsets();
           if (offsets[offsets.length - 1] < offsetMark) {
-            nullIterator = nullValueIndex.peekableIterator();
+            nullIterator = nullValueBitmap.peekableIterator();
           }
           offsetMark = offsets[offsets.length - 1];
           valueColumn.get(valueVector, offsets, offset.getCurrentVectorSize());
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
index 6fa2fe6d0f0..1bde18e188a 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
@@ -51,7 +51,9 @@ import org.apache.druid.segment.column.TypeSignature;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.BitmapSerdeFactory;
 import org.apache.druid.segment.data.ColumnarDoubles;
+import org.apache.druid.segment.data.ColumnarInts;
 import org.apache.druid.segment.data.CompressedColumnarDoublesSuppliers;
+import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
 import org.apache.druid.segment.data.FixedIndexed;
 import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.data.VByte;
@@ -109,6 +111,11 @@ public class ScalarDoubleColumnAndIndexSupplier implements 
Supplier<NestedCommon
             columnName,
             ColumnSerializerUtils.DOUBLE_VALUE_COLUMN_FILE_NAME
         );
+        final ByteBuffer encodedValuesBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
+            mapper,
+            columnName,
+            ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME
+        );
 
         final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
         if (parent != null) {
@@ -128,6 +135,12 @@ public class ScalarDoubleColumnAndIndexSupplier implements 
Supplier<NestedCommon
           );
         }
 
+        final CompressedVSizeColumnarIntsSupplier encodedCol = 
CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
+            encodedValuesBuffer,
+            byteOrder,
+            columnBuilder.getFileMapper()
+        );
+
         final Supplier<ColumnarDoubles> doubles = 
CompressedColumnarDoublesSuppliers.fromByteBuffer(
             doublesValueColumn,
             byteOrder
@@ -144,6 +157,7 @@ public class ScalarDoubleColumnAndIndexSupplier implements 
Supplier<NestedCommon
         );
         return new ScalarDoubleColumnAndIndexSupplier(
             doubleDictionarySupplier,
+            encodedCol,
             doubles,
             rBitmaps,
             bitmapSerdeFactory.getBitmapFactory(),
@@ -160,6 +174,7 @@ public class ScalarDoubleColumnAndIndexSupplier implements 
Supplier<NestedCommon
 
   private final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
 
+  private final Supplier<ColumnarInts> encodedValuesSupplier;
   private final Supplier<ColumnarDoubles> valueColumnSupplier;
 
   private final GenericIndexed<ImmutableBitmap> valueIndexes;
@@ -170,6 +185,7 @@ public class ScalarDoubleColumnAndIndexSupplier implements 
Supplier<NestedCommon
 
   private ScalarDoubleColumnAndIndexSupplier(
       Supplier<FixedIndexed<Double>> longDictionary,
+      Supplier<ColumnarInts> encodedValuesSupplier,
       Supplier<ColumnarDoubles> valueColumnSupplier,
       GenericIndexed<ImmutableBitmap> valueIndexes,
       BitmapFactory bitmapFactory,
@@ -177,6 +193,7 @@ public class ScalarDoubleColumnAndIndexSupplier implements 
Supplier<NestedCommon
   )
   {
     this.doubleDictionarySupplier = longDictionary;
+    this.encodedValuesSupplier = encodedValuesSupplier;
     this.valueColumnSupplier = valueColumnSupplier;
     this.valueIndexes = valueIndexes;
     this.bitmapFactory = bitmapFactory;
@@ -189,8 +206,10 @@ public class ScalarDoubleColumnAndIndexSupplier implements 
Supplier<NestedCommon
   {
     return new ScalarDoubleColumn(
         doubleDictionarySupplier.get(),
+        encodedValuesSupplier,
         valueColumnSupplier.get(),
-        nullValueBitmap
+        nullValueBitmap,
+        bitmapFactory
     );
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
index 8a54ff31278..4d5db6a45f4 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumn.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.nested;
 
+import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.common.semantic.SemanticUtils;
@@ -26,6 +27,7 @@ import 
org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.LongColumnSelector;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.data.ColumnarInts;
 import org.apache.druid.segment.data.ColumnarLongs;
 import org.apache.druid.segment.data.FixedIndexed;
 import org.apache.druid.segment.data.Indexed;
@@ -40,6 +42,7 @@ import org.roaringbitmap.PeekableIntIterator;
 import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 /**
  * {@link NestedCommonFormatColumn} for {@link ColumnType#LONG}
@@ -50,18 +53,24 @@ public class ScalarLongColumn implements 
NestedCommonFormatColumn
       SemanticUtils.makeAsMap(ScalarLongColumn.class);
 
   private final FixedIndexed<Long> longDictionary;
+  private final Supplier<ColumnarInts> encodedValuesSupplier;
   private final ColumnarLongs valueColumn;
-  private final ImmutableBitmap nullValueIndex;
+  private final ImmutableBitmap nullValueBitmap;
+  private final BitmapFactory bitmapFactory;
 
   public ScalarLongColumn(
       FixedIndexed<Long> longDictionary,
+      Supplier<ColumnarInts> encodedValuesSupplier,
       ColumnarLongs valueColumn,
-      ImmutableBitmap nullValueIndex
+      ImmutableBitmap nullValueBitmap,
+      BitmapFactory bitmapFactory
   )
   {
     this.longDictionary = longDictionary;
+    this.encodedValuesSupplier = encodedValuesSupplier;
     this.valueColumn = valueColumn;
-    this.nullValueIndex = nullValueIndex;
+    this.nullValueBitmap = nullValueBitmap;
+    this.bitmapFactory = bitmapFactory;
   }
 
 
@@ -82,7 +91,7 @@ public class ScalarLongColumn implements 
NestedCommonFormatColumn
   {
     return new LongColumnSelector()
     {
-      private PeekableIntIterator nullIterator = 
nullValueIndex.peekableIterator();
+      private PeekableIntIterator nullIterator = 
nullValueBitmap.peekableIterator();
       private int nullMark = -1;
       private int offsetMark = -1;
 
@@ -96,7 +105,7 @@ public class ScalarLongColumn implements 
NestedCommonFormatColumn
       public void inspectRuntimeShape(RuntimeShapeInspector inspector)
       {
         inspector.visit("longColumn", valueColumn);
-        inspector.visit("nullBitmap", nullValueIndex);
+        inspector.visit("nullBitmap", nullValueBitmap);
       }
 
       @Override
@@ -109,7 +118,7 @@ public class ScalarLongColumn implements 
NestedCommonFormatColumn
         if (i < offsetMark) {
           // offset was reset, reset iterator state
           nullMark = -1;
-          nullIterator = nullValueIndex.peekableIterator();
+          nullIterator = nullValueBitmap.peekableIterator();
         }
         offsetMark = i;
         if (nullMark < i) {
@@ -134,7 +143,7 @@ public class ScalarLongColumn implements 
NestedCommonFormatColumn
       private int id = ReadableVectorInspector.NULL_ID;
 
       @Nullable
-      private PeekableIntIterator nullIterator = 
nullValueIndex.peekableIterator();
+      private PeekableIntIterator nullIterator = 
nullValueBitmap.peekableIterator();
       private int offsetMark = -1;
 
       @Override
@@ -163,14 +172,14 @@ public class ScalarLongColumn implements 
NestedCommonFormatColumn
 
         if (offset.isContiguous()) {
           if (offset.getStartOffset() < offsetMark) {
-            nullIterator = nullValueIndex.peekableIterator();
+            nullIterator = nullValueBitmap.peekableIterator();
           }
           offsetMark = offset.getStartOffset() + offset.getCurrentVectorSize();
           valueColumn.get(valueVector, offset.getStartOffset(), 
offset.getCurrentVectorSize());
         } else {
           final int[] offsets = offset.getOffsets();
           if (offsets[offsets.length - 1] < offsetMark) {
-            nullIterator = nullValueIndex.peekableIterator();
+            nullIterator = nullValueBitmap.peekableIterator();
           }
           offsetMark = offsets[offsets.length - 1];
           valueColumn.get(valueVector, offsets, offset.getCurrentVectorSize());
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
index a8d1fa057d8..1b1b9fb97e7 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
@@ -49,8 +49,10 @@ import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.TypeSignature;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.BitmapSerdeFactory;
+import org.apache.druid.segment.data.ColumnarInts;
 import org.apache.druid.segment.data.ColumnarLongs;
 import org.apache.druid.segment.data.CompressedColumnarLongsSupplier;
+import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
 import org.apache.druid.segment.data.FixedIndexed;
 import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.data.VByte;
@@ -103,6 +105,11 @@ public class ScalarLongColumnAndIndexSupplier implements 
Supplier<NestedCommonFo
 
         final SmooshedFileMapper mapper = columnBuilder.getFileMapper();
 
+        final ByteBuffer encodedValuesBuffer = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
+            mapper,
+            columnName,
+            ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME
+        );
         final ByteBuffer longsValueColumn = 
NestedCommonFormatColumnPartSerde.loadInternalFile(
             mapper,
             columnName,
@@ -137,12 +144,19 @@ public class ScalarLongColumnAndIndexSupplier implements 
Supplier<NestedCommonFo
           );
         }
 
+        final CompressedVSizeColumnarIntsSupplier encodedCol = 
CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
+            encodedValuesBuffer,
+            byteOrder,
+            columnBuilder.getFileMapper()
+        );
+
         final Supplier<ColumnarLongs> longs = 
CompressedColumnarLongsSupplier.fromByteBuffer(
             longsValueColumn,
             byteOrder
         );
         return new ScalarLongColumnAndIndexSupplier(
             longDictionarySupplier,
+            encodedCol,
             longs,
             rBitmaps,
             bitmapSerdeFactory.getBitmapFactory(),
@@ -159,6 +173,7 @@ public class ScalarLongColumnAndIndexSupplier implements 
Supplier<NestedCommonFo
 
   private final Supplier<FixedIndexed<Long>> longDictionarySupplier;
 
+  private final Supplier<ColumnarInts> encodedValuesSupplier;
   private final Supplier<ColumnarLongs> valueColumnSupplier;
 
   private final GenericIndexed<ImmutableBitmap> valueIndexes;
@@ -170,6 +185,7 @@ public class ScalarLongColumnAndIndexSupplier implements 
Supplier<NestedCommonFo
 
   private ScalarLongColumnAndIndexSupplier(
       Supplier<FixedIndexed<Long>> longDictionarySupplier,
+      Supplier<ColumnarInts> encodedValuesSupplier,
       Supplier<ColumnarLongs> valueColumnSupplier,
       GenericIndexed<ImmutableBitmap> valueIndexes,
       BitmapFactory bitmapFactory,
@@ -177,6 +193,7 @@ public class ScalarLongColumnAndIndexSupplier implements 
Supplier<NestedCommonFo
   )
   {
     this.longDictionarySupplier = longDictionarySupplier;
+    this.encodedValuesSupplier = encodedValuesSupplier;
     this.valueColumnSupplier = valueColumnSupplier;
     this.valueIndexes = valueIndexes;
     this.bitmapFactory = bitmapFactory;
@@ -189,8 +206,10 @@ public class ScalarLongColumnAndIndexSupplier implements 
Supplier<NestedCommonFo
   {
     return new ScalarLongColumn(
         longDictionarySupplier.get(),
+        encodedValuesSupplier,
         valueColumnSupplier.get(),
-        nullValueBitmap
+        nullValueBitmap,
+        bitmapFactory
     );
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java
index 7f1111c2e8b..ee7fe1475b7 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnAndIndexSupplier.java
@@ -117,6 +117,7 @@ public class ScalarStringColumnAndIndexSupplier implements 
Supplier<NestedCommon
   private final Supplier<ColumnarInts> encodedColumnSupplier;
   private final GenericIndexed<ImmutableBitmap> valueIndexes;
   private final ColumnIndexSupplier stringIndexSupplier;
+  private final BitmapSerdeFactory serdeFactory;
 
   private ScalarStringColumnAndIndexSupplier(
       Supplier<? extends Indexed<ByteBuffer>> dictionarySupplier,
@@ -128,6 +129,7 @@ public class ScalarStringColumnAndIndexSupplier implements 
Supplier<NestedCommon
     this.dictionarySupplier = dictionarySupplier;
     this.encodedColumnSupplier = encodedColumnSupplier;
     this.valueIndexes = valueIndexes;
+    this.serdeFactory = serdeFactory;
     this.stringIndexSupplier = new StringUtf8ColumnIndexSupplier<>(
         serdeFactory.getBitmapFactory(),
         dictionarySupplier,
@@ -139,7 +141,12 @@ public class ScalarStringColumnAndIndexSupplier implements 
Supplier<NestedCommon
   @Override
   public NestedCommonFormatColumn get()
   {
-    return new StringUtf8DictionaryEncodedColumn(encodedColumnSupplier.get(), 
null, dictionarySupplier.get());
+    return new StringUtf8DictionaryEncodedColumn(
+        encodedColumnSupplier.get(),
+        null,
+        dictionarySupplier.get(),
+        serdeFactory.getBitmapFactory()
+    );
   }
 
   @Nullable
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
index dfd6307148d..8125fe53100 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
@@ -23,6 +23,7 @@ import com.google.common.primitives.Doubles;
 import com.google.common.primitives.Floats;
 import it.unimi.dsi.fastutil.ints.IntArraySet;
 import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.common.guava.GuavaUtils;
 import org.apache.druid.common.semantic.SemanticUtils;
@@ -96,6 +97,7 @@ public class VariantColumn<TStringDictionary extends 
Indexed<ByteBuffer>>
   private final ExpressionType logicalExpressionType;
   @Nullable
   private final FieldTypeInfo.TypeSet variantTypes;
+  private final BitmapFactory bitmapFactory;
   private final int adjustLongId;
   private final int adjustDoubleId;
   private final int adjustArrayId;
@@ -108,7 +110,8 @@ public class VariantColumn<TStringDictionary extends 
Indexed<ByteBuffer>>
       ColumnarInts encodedValueColumn,
       ImmutableBitmap nullValueBitmap,
       ColumnType logicalType,
-      @Nullable Byte variantTypeSetByte
+      @Nullable Byte variantTypeSetByte,
+      BitmapFactory bitmapFactory
   )
   {
     this.stringDictionary = stringDictionary;
@@ -119,6 +122,7 @@ public class VariantColumn<TStringDictionary extends 
Indexed<ByteBuffer>>
     this.nullValueBitmap = nullValueBitmap;
     this.logicalExpressionType = 
ExpressionType.fromColumnTypeStrict(logicalType);
     this.variantTypes = variantTypeSetByte == null ? null : new 
FieldTypeInfo.TypeSet(variantTypeSetByte);
+    this.bitmapFactory = bitmapFactory;
     // use the variant type bytes if set, in current code the logical type 
should have been computed via this same means
     // however older versions of the code had a bug which could incorrectly 
classify mixed types as nested data
     if (variantTypeSetByte != null) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
index b93235b5bab..6a2ec476976 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
@@ -268,7 +268,8 @@ public class VariantColumnAndIndexSupplier implements 
Supplier<NestedCommonForma
         encodedValueColumnSupplier.get(),
         nullValueBitmap,
         logicalType,
-        variantTypeSetByte
+        variantTypeSetByte,
+        bitmapFactory
     );
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
index 9adb8fac2b7..5b706b44b7f 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
@@ -20,12 +20,11 @@
 package org.apache.druid.segment.nested;
 
 import com.google.common.base.Preconditions;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
 import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -47,6 +46,7 @@ import 
org.apache.druid.segment.data.FrontCodedIntArrayIndexedWriter;
 import org.apache.druid.segment.data.GenericIndexedWriter;
 import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
 import org.apache.druid.segment.serde.ColumnSerializerUtils;
+import org.apache.druid.segment.serde.Serializer;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
@@ -55,6 +55,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
 
 /**
  * Serializer for a {@link NestedCommonFormatColumn} for single type arrays 
and mixed type columns, but not columns
@@ -80,7 +81,6 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
   private boolean dictionarySerialized = false;
   private FixedIndexedIntWriter intermediateValueWriter;
 
-  private ByteBuffer columnNameBytes = null;
   private boolean hasNulls;
   private boolean writeDictionary = true;
   @Nullable
@@ -88,6 +88,8 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
   @Nullable
   private final Byte variantTypeSetByte;
 
+  private InternalSerializer internalSerializer = null;
+
   public VariantColumnSerializer(
       String name,
       @Nullable ColumnType logicalType,
@@ -299,155 +301,246 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
     }
   }
 
-  private void closeForWrite()
+  private void closeForWrite() throws IOException
   {
     if (!closedForWrite) {
-      columnNameBytes = computeFilenameBytes();
+      // write out compressed dictionaryId int column, bitmap indexes, and 
array element bitmap indexes
+      // by iterating intermediate value column the intermediate value column 
should be replaced someday by a cooler
+      // compressed int column writer that allows easy iteration of the values 
it writes out, so that we could just
+      // build the bitmap indexes here instead of doing both things
+      String filenameBase = StringUtils.format("%s.forward_dim", name);
+      final int scalarCardinality = dictionaryIdLookup.getStringCardinality()
+                                    + dictionaryIdLookup.getLongCardinality()
+                                    + 
dictionaryIdLookup.getDoubleCardinality();
+      final int cardinality = scalarCardinality + 
dictionaryIdLookup.getArrayCardinality();
+      final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
+      final CompressionStrategy compressionToUse;
+      if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
+        compressionToUse = compression;
+      } else {
+        compressionToUse = CompressionStrategy.LZ4;
+      }
+
+      final SingleValueColumnarIntsSerializer encodedValueSerializer = 
CompressedVSizeColumnarIntsSerializer.create(
+          name,
+          segmentWriteOutMedium,
+          filenameBase,
+          cardinality,
+          compressionToUse,
+          segmentWriteOutMedium.getCloser()
+      );
+      encodedValueSerializer.open();
+
+      final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new 
GenericIndexedWriter<>(
+          segmentWriteOutMedium,
+          name,
+          indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+      );
+      bitmapIndexWriter.open();
+      bitmapIndexWriter.setObjectsNotSorted();
+      final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+      final MutableBitmap[] arrayElements = new 
MutableBitmap[scalarCardinality];
+      for (int i = 0; i < bitmaps.length; i++) {
+        bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+      }
+      final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = 
new GenericIndexedWriter<>(
+          segmentWriteOutMedium,
+          name + "_arrays",
+          indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+      );
+      arrayElementIndexWriter.open();
+      arrayElementIndexWriter.setObjectsNotSorted();
+
+      final IntIterator rows = intermediateValueWriter.getIterator();
+      int rowCount = 0;
+      final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
+                              + dictionaryIdLookup.getLongCardinality()
+                              + dictionaryIdLookup.getDoubleCardinality();
+      while (rows.hasNext()) {
+        final int dictId = rows.nextInt();
+        encodedValueSerializer.addValue(dictId);
+        bitmaps[dictId].add(rowCount);
+        if (dictId >= arrayBaseId) {
+          int[] array = dictionaryIdLookup.getArrayValue(dictId);
+          for (int elementId : array) {
+            MutableBitmap bitmap = arrayElements[elementId];
+            if (bitmap == null) {
+              bitmap = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+              arrayElements[elementId] = bitmap;
+            }
+            bitmap.add(rowCount);
+          }
+        }
+        rowCount++;
+      }
+
+      for (int i = 0; i < bitmaps.length; i++) {
+        final MutableBitmap bitmap = bitmaps[i];
+        bitmapIndexWriter.write(
+            
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+        );
+        bitmaps[i] = null; // Reclaim memory
+      }
+      if (writeDictionary) {
+        for (int i = 0; i < arrayElements.length; ++i) {
+          if (arrayElements[i] != null) {
+            arrayElementDictionaryWriter.write(i);
+            arrayElementIndexWriter.write(arrayElements[i]);
+          }
+        }
+      }
+
       closedForWrite = true;
+      internalSerializer = new InternalSerializer(
+          name,
+          variantTypeSetByte,
+          dictionaryWriter,
+          longDictionaryWriter,
+          doubleDictionaryWriter,
+          arrayDictionaryWriter,
+          encodedValueSerializer,
+          bitmapIndexWriter,
+          arrayElementDictionaryWriter,
+          arrayElementIndexWriter,
+          dictionaryIdLookup,
+          writeDictionary
+      );
     }
   }
 
   @Override
-  public long getSerializedSize()
+  public long getSerializedSize() throws IOException
   {
     closeForWrite();
-
-    long size = 1 + columnNameBytes.capacity();
-    // the value dictionaries, raw column, and null index are all stored in 
separate files
-    if (variantTypeSetByte != null) {
-      size += 1;
-    }
-    return size;
+    return internalSerializer.getSerializedSize();
   }
 
   @Override
-  public void writeTo(
-      WritableByteChannel channel,
-      FileSmoosher smoosher
-  ) throws IOException
+  public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) 
throws IOException
   {
-    Preconditions.checkState(closedForWrite, "Not closed yet!");
-    if (writeDictionary) {
-      Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not 
sorted?!?");
-    }
-
-    // write out compressed dictionaryId int column, bitmap indexes, and array 
element bitmap indexes
-    // by iterating intermediate value column the intermediate value column 
should be replaced someday by a cooler
-    // compressed int column writer that allows easy iteration of the values 
it writes out, so that we could just
-    // build the bitmap indexes here instead of doing both things
-    String filenameBase = StringUtils.format("%s.forward_dim", name);
-    final int cardinality = dictionaryIdLookup.getStringCardinality()
-                            + dictionaryIdLookup.getLongCardinality()
-                            + dictionaryIdLookup.getDoubleCardinality()
-                            + dictionaryIdLookup.getArrayCardinality();
-    final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
-    final CompressionStrategy compressionToUse;
-    if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
-      compressionToUse = compression;
-    } else {
-      compressionToUse = CompressionStrategy.LZ4;
-    }
-
-    final SingleValueColumnarIntsSerializer encodedValueSerializer = 
CompressedVSizeColumnarIntsSerializer.create(
-        name,
-        segmentWriteOutMedium,
-        filenameBase,
-        cardinality,
-        compressionToUse,
-        segmentWriteOutMedium.getCloser()
-    );
-    encodedValueSerializer.open();
+    closeForWrite();
+    internalSerializer.writeTo(channel, smoosher);
+  }
 
-    final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new 
GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name,
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    bitmapIndexWriter.open();
-    bitmapIndexWriter.setObjectsNotSorted();
-    final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
-    final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new 
Int2ObjectRBTreeMap<>();
-    for (int i = 0; i < bitmaps.length; i++) {
-      bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-    }
-    final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new 
GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name + "_arrays",
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    arrayElementIndexWriter.open();
-    arrayElementIndexWriter.setObjectsNotSorted();
-
-    final IntIterator rows = intermediateValueWriter.getIterator();
-    int rowCount = 0;
-    final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
-                            + dictionaryIdLookup.getLongCardinality()
-                            + dictionaryIdLookup.getDoubleCardinality();
-    while (rows.hasNext()) {
-      final int dictId = rows.nextInt();
-      encodedValueSerializer.addValue(dictId);
-      bitmaps[dictId].add(rowCount);
-      if (dictId >= arrayBaseId) {
-        int[] array = dictionaryIdLookup.getArrayValue(dictId);
-        for (int elementId : array) {
-          arrayElements.computeIfAbsent(
-              elementId,
-              (id) -> 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
-          ).add(rowCount);
+  /**
+   * Internal serializer used to serailize a {@link VariantColumn}. Contains 
the logic to write out the column to a
+   * {@link FileSmoosher}. Created by {@link VariantColumnSerializer} once it 
is closed for writes.
+   */
+  public static class InternalSerializer implements Serializer
+  {
+    private final String columnName;
+    private final ByteBuffer columnNameBytes;
+    private final Byte variantTypeSetByte;
+
+    private final DictionaryWriter<String> dictionaryWriter;
+    private final FixedIndexedWriter<Long> longDictionaryWriter;
+    private final FixedIndexedWriter<Double> doubleDictionaryWriter;
+    private final FrontCodedIntArrayIndexedWriter arrayDictionaryWriter;
+
+    private final SingleValueColumnarIntsSerializer encodedValueSerializer;
+    private final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
+
+    private final FixedIndexedIntWriter arrayElementDictionaryWriter;
+    private final GenericIndexedWriter<ImmutableBitmap> 
arrayElementIndexWriter;
+    private final boolean writeDictionary;
+
+    private final DictionaryIdLookup dictionaryIdLookup;
+
+    public InternalSerializer(
+        String columnName,
+        Byte variantTypeSetByte,
+        DictionaryWriter<String> dictionaryWriter,
+        FixedIndexedWriter<Long> longDictionaryWriter,
+        FixedIndexedWriter<Double> doubleDictionaryWriter,
+        FrontCodedIntArrayIndexedWriter arrayDictionaryWriter,
+        SingleValueColumnarIntsSerializer encodedValueSerializer,
+        GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter,
+        FixedIndexedIntWriter arrayElementDictionaryWriter,
+        GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter,
+        DictionaryIdLookup dictionaryIdLookup,
+        boolean writeDictionary
+    )
+    {
+      this.columnName = columnName;
+      this.columnNameBytes = 
ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName);
+      this.variantTypeSetByte = variantTypeSetByte;
+      this.dictionaryWriter = dictionaryWriter;
+      this.longDictionaryWriter = longDictionaryWriter;
+      this.doubleDictionaryWriter = doubleDictionaryWriter;
+      this.arrayDictionaryWriter = arrayDictionaryWriter;
+      this.encodedValueSerializer = encodedValueSerializer;
+      this.bitmapIndexWriter = bitmapIndexWriter;
+      this.arrayElementDictionaryWriter = arrayElementDictionaryWriter;
+      this.arrayElementIndexWriter = arrayElementIndexWriter;
+      this.writeDictionary = writeDictionary;
+      this.dictionaryIdLookup = dictionaryIdLookup;
+
+      boolean[] dictionariesSorted = new boolean[]{
+          dictionaryWriter.isSorted(),
+          longDictionaryWriter.isSorted(),
+          doubleDictionaryWriter.isSorted(),
+          arrayDictionaryWriter.isSorted()
+      };
+      for (boolean sorted : dictionariesSorted) {
+        if (writeDictionary && !sorted) {
+          throw DruidException.defensive(
+              "Dictionary is not sorted? [%s]  Should always be sorted",
+              Arrays.toString(dictionariesSorted)
+          );
         }
       }
-      rowCount++;
     }
 
-    for (int i = 0; i < bitmaps.length; i++) {
-      final MutableBitmap bitmap = bitmaps[i];
-      bitmapIndexWriter.write(
-          
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
-      );
-      bitmaps[i] = null; // Reclaim memory
-    }
-    if (writeDictionary) {
-      for (Int2ObjectMap.Entry<MutableBitmap> arrayElement : 
arrayElements.int2ObjectEntrySet()) {
-        arrayElementDictionaryWriter.write(arrayElement.getIntKey());
-        arrayElementIndexWriter.write(
-            
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(arrayElement.getValue())
-        );
+    @Override
+    public long getSerializedSize()
+    {
+      long size = 1 + columnNameBytes.capacity();
+      // the value dictionaries, indexes, array element indexes and dictionary 
id columns are all stored in separate files
+      if (variantTypeSetByte != null) {
+        size += 1;
       }
+      return size;
     }
 
-    writeV0Header(channel, columnNameBytes);
-    if (variantTypeSetByte != null) {
-      channel.write(ByteBuffer.wrap(new byte[]{variantTypeSetByte}));
-    }
-
-    if (writeDictionary) {
-      if (dictionaryIdLookup.getStringBufferMapper() != null) {
-        copyFromTempSmoosh(smoosher, 
dictionaryIdLookup.getStringBufferMapper());
-      } else {
-        writeInternal(smoosher, dictionaryWriter, 
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
-      }
-      if (dictionaryIdLookup.getLongBufferMapper() != null) {
-        copyFromTempSmoosh(smoosher, dictionaryIdLookup.getLongBufferMapper());
-      } else {
-        writeInternal(smoosher, longDictionaryWriter, 
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
-      }
-      if (dictionaryIdLookup.getDoubleBufferMapper() != null) {
-        copyFromTempSmoosh(smoosher, 
dictionaryIdLookup.getDoubleBufferMapper());
-      } else {
-        writeInternal(smoosher, doubleDictionaryWriter, 
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
+    @Override
+    public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) 
throws IOException
+    {
+      writeV0Header(channel, columnNameBytes);
+      if (variantTypeSetByte != null) {
+        channel.write(ByteBuffer.wrap(new byte[]{variantTypeSetByte}));
       }
-      if (dictionaryIdLookup.getArrayBufferMapper() != null) {
-        copyFromTempSmoosh(smoosher, 
dictionaryIdLookup.getArrayBufferMapper());
-      } else {
-        writeInternal(smoosher, arrayDictionaryWriter, 
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
+
+      if (writeDictionary) {
+        if (dictionaryIdLookup.getStringBufferMapper() != null) {
+          copyFromTempSmoosh(smoosher, 
dictionaryIdLookup.getStringBufferMapper());
+        } else {
+          ColumnSerializerUtils.writeInternal(smoosher, dictionaryWriter, 
columnName, ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
+        }
+
+        if (dictionaryIdLookup.getLongBufferMapper() != null) {
+          copyFromTempSmoosh(smoosher, 
dictionaryIdLookup.getLongBufferMapper());
+        } else {
+          ColumnSerializerUtils.writeInternal(smoosher, longDictionaryWriter, 
columnName, ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
+        }
+
+        if (dictionaryIdLookup.getDoubleBufferMapper() != null) {
+          copyFromTempSmoosh(smoosher, 
dictionaryIdLookup.getDoubleBufferMapper());
+        } else {
+          ColumnSerializerUtils.writeInternal(smoosher, 
doubleDictionaryWriter, columnName, 
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
+        }
+        if (dictionaryIdLookup.getArrayBufferMapper() != null) {
+          copyFromTempSmoosh(smoosher, 
dictionaryIdLookup.getArrayBufferMapper());
+        } else {
+          ColumnSerializerUtils.writeInternal(smoosher, arrayDictionaryWriter, 
columnName, ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
+        }
+
+        ColumnSerializerUtils.writeInternal(smoosher, 
arrayElementDictionaryWriter, columnName, 
ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME);
       }
+      ColumnSerializerUtils.writeInternal(smoosher, encodedValueSerializer, 
columnName, ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME);
+      ColumnSerializerUtils.writeInternal(smoosher, bitmapIndexWriter, 
columnName, ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME);
+      ColumnSerializerUtils.writeInternal(smoosher, arrayElementIndexWriter, 
columnName, ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME);
 
-      writeInternal(smoosher, arrayElementDictionaryWriter, 
ColumnSerializerUtils.ARRAY_ELEMENT_DICTIONARY_FILE_NAME);
+      log.info("Column [%s] serialized successfully.", columnName);
     }
-    writeInternal(smoosher, encodedValueSerializer, 
ColumnSerializerUtils.ENCODED_VALUE_COLUMN_FILE_NAME);
-    writeInternal(smoosher, bitmapIndexWriter, 
ColumnSerializerUtils.BITMAP_INDEX_FILE_NAME);
-    writeInternal(smoosher, arrayElementIndexWriter, 
ColumnSerializerUtils.ARRAY_ELEMENT_BITMAP_INDEX_FILE_NAME);
-
-    log.info("Column [%s] serialized successfully.", name);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
 
b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
index cf9b7c70d56..02e7f5b4d39 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
@@ -344,7 +344,8 @@ public class DictionaryEncodedColumnPartSerde implements 
ColumnPartSerde
         final StringUtf8DictionaryEncodedColumnSupplier<?> supplier = new 
StringUtf8DictionaryEncodedColumnSupplier<>(
             dictionarySupplier,
             rSingleValuedColumn,
-            rMultiValuedColumn
+            rMultiValuedColumn,
+            bitmapSerdeFactory.getBitmapFactory()
         );
         builder.setHasMultipleValues(hasMultipleValues)
                .setHasNulls(hasNulls)
diff --git 
a/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java
 
b/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java
index 33de1869e27..6d443ebf11d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/serde/StringUtf8DictionaryEncodedColumnSupplier.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.serde;
 
 import com.google.common.base.Supplier;
+import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.segment.column.DictionaryEncodedColumn;
 import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
@@ -38,16 +39,19 @@ public class 
StringUtf8DictionaryEncodedColumnSupplier<TIndexed extends Indexed<
   private final Supplier<TIndexed> utf8Dictionary;
   private final @Nullable Supplier<ColumnarInts> singleValuedColumn;
   private final @Nullable Supplier<ColumnarMultiInts> multiValuedColumn;
+  private final BitmapFactory bitmapFactory;
 
   public StringUtf8DictionaryEncodedColumnSupplier(
       Supplier<TIndexed> utf8Dictionary,
       @Nullable Supplier<ColumnarInts> singleValuedColumn,
-      @Nullable Supplier<ColumnarMultiInts> multiValuedColumn
+      @Nullable Supplier<ColumnarMultiInts> multiValuedColumn,
+      BitmapFactory bitmapFactory
   )
   {
     this.utf8Dictionary = utf8Dictionary;
     this.singleValuedColumn = singleValuedColumn;
     this.multiValuedColumn = multiValuedColumn;
+    this.bitmapFactory = bitmapFactory;
   }
 
   public Supplier<TIndexed> getDictionary()
@@ -64,19 +68,22 @@ public class 
StringUtf8DictionaryEncodedColumnSupplier<TIndexed extends Indexed<
       return new StringUtf8DictionaryEncodedColumn(
           singleValuedColumn != null ? new 
CombineFirstTwoValuesColumnarInts(singleValuedColumn.get()) : null,
           multiValuedColumn != null ? new 
CombineFirstTwoValuesColumnarMultiInts(multiValuedColumn.get()) : null,
-          CombineFirstTwoEntriesIndexed.returnNull(suppliedUtf8Dictionary)
+          CombineFirstTwoEntriesIndexed.returnNull(suppliedUtf8Dictionary),
+          bitmapFactory
       );
     } else if 
(NullHandling.mustReplaceFirstValueWithNullInDictionary(suppliedUtf8Dictionary))
 {
       return new StringUtf8DictionaryEncodedColumn(
           singleValuedColumn != null ? singleValuedColumn.get() : null,
           multiValuedColumn != null ? multiValuedColumn.get() : null,
-          new ReplaceFirstValueWithNullIndexed<>(suppliedUtf8Dictionary)
+          new ReplaceFirstValueWithNullIndexed<>(suppliedUtf8Dictionary),
+          bitmapFactory
       );
     } else {
       return new StringUtf8DictionaryEncodedColumn(
           singleValuedColumn != null ? singleValuedColumn.get() : null,
           multiValuedColumn != null ? multiValuedColumn.get() : null,
-          suppliedUtf8Dictionary
+          suppliedUtf8Dictionary,
+          bitmapFactory
       );
     }
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java 
b/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java
index 3931601dd4f..0825e0e46bd 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java
@@ -30,10 +30,10 @@ import java.io.IOException;
  */
 public interface IOIterator<T> extends Closeable
 {
-  boolean hasNext() throws IOException;
+  boolean hasNext();
 
   T next() throws IOException;
 
   @Override
-  void close() throws IOException;
+  void close();
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java 
b/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java
index ffbf00a9c40..476795b9f5e 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java
@@ -64,6 +64,41 @@ import java.nio.ByteOrder;
  */
 public interface StagedSerde<T>
 {
+  static StagedSerde<byte[]> forBytes()
+  {
+    return new StagedSerde<byte[]>()
+    {
+      @Override
+      public byte[] serialize(byte[] value)
+      {
+        return value;
+      }
+
+      @Override
+      public byte[] deserialize(byte[] bytes)
+      {
+        return bytes;
+      }
+
+      @Override
+      public StorableBuffer serializeDelayed(@Nullable byte[] value)
+      {
+        throw new UnsupportedOperationException();
+      }
+
+      @Nullable
+      @Override
+      public byte[] deserialize(ByteBuffer byteBuffer)
+      {
+        byte[] retVal = new byte[byteBuffer.remaining()];
+        int position = byteBuffer.position();
+        byteBuffer.get(retVal);
+        byteBuffer.position(position);
+        return retVal;
+      }
+    };
+  }
+
   /**
    * Useful method when some computation is necessary to prepare for 
serialization without actually writing out
    * all the bytes in order to determine the serialized size. It allows 
encapsulation of the size computation and
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java
index ff4dc11f220..0a79b97f523 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/PredicateValueMatcherFactoryTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.filter;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.filter.SelectorPredicateFactory;
@@ -76,7 +77,8 @@ public class PredicateValueMatcherFactoryTest extends 
InitializedNullHandlingTes
             GenericIndexed.UTF8_STRATEGY
         )::singleThreaded,
         null,
-        () -> 
VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new
 int[]{1})))
+        () -> 
VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new
 int[]{1}))),
+        new RoaringBitmapFactory()
     );
     final ValueMatcher matcher = forSelector("v2")
         .makeDimensionProcessor(columnSupplier.get().makeDimensionSelector(new 
SimpleAscendingOffset(1), null), true);
@@ -97,7 +99,8 @@ public class PredicateValueMatcherFactoryTest extends 
InitializedNullHandlingTes
             GenericIndexed.UTF8_STRATEGY
         )::singleThreaded,
         null,
-        () -> 
VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new
 int[]{1})))
+        () -> 
VSizeColumnarMultiInts.fromIterable(ImmutableList.of(VSizeColumnarInts.fromArray(new
 int[]{1}))),
+        new RoaringBitmapFactory()
     );
     final ValueMatcher matcher = forSelector("v3")
         .makeDimensionProcessor(columnSupplier.get().makeDimensionSelector(new 
SimpleAscendingOffset(1), null), true);
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java
 
b/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java
index 86e4e0ee08b..bdb15ee445a 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/ValueMatchersTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.filter;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.filter.DruidObjectPredicate;
 import org.apache.druid.query.filter.DruidPredicateMatch;
@@ -47,13 +48,15 @@ public class ValueMatchersTest extends 
InitializedNullHandlingTest
   @Before
   public void setup()
   {
+    final RoaringBitmapFactory bitmapFactory = new RoaringBitmapFactory();
     supplierSingleConstant = new StringUtf8DictionaryEncodedColumnSupplier<>(
         GenericIndexed.fromIterable(
             ImmutableList.of(ByteBuffer.wrap(StringUtils.toUtf8("value"))),
             GenericIndexed.UTF8_STRATEGY
         )::singleThreaded,
         () -> VSizeColumnarInts.fromArray(new int[]{0}),
-        null
+        null,
+        bitmapFactory
     );
     supplierSingle = new StringUtf8DictionaryEncodedColumnSupplier<>(
         GenericIndexed.fromIterable(
@@ -64,7 +67,8 @@ public class ValueMatchersTest extends 
InitializedNullHandlingTest
             GenericIndexed.UTF8_STRATEGY
         )::singleThreaded,
         () -> VSizeColumnarInts.fromArray(new int[]{0, 0, 1, 0, 1}),
-        null
+        null,
+        bitmapFactory
     );
     supplierMulti = new StringUtf8DictionaryEncodedColumnSupplier<>(
         GenericIndexed.fromIterable(
@@ -77,7 +81,8 @@ public class ValueMatchersTest extends 
InitializedNullHandlingTest
                 VSizeColumnarInts.fromArray(new int[]{0, 0}),
                 VSizeColumnarInts.fromArray(new int[]{0})
             )
-        )
+        ),
+        bitmapFactory
     );
   }
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to