This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 31.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/31.0.1 by this push:
     new c9aac1bb405 projection segment merge fixes (#17460) (#17503)
c9aac1bb405 is described below

commit c9aac1bb4058acd335fde36e55d3a9985f329b94
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Nov 22 14:58:35 2024 -0800

    projection segment merge fixes (#17460) (#17503)
    
    changes:
    * fix issue when merging projections from multiple-incremental persists 
which was hoping that some 'dim conversion' buffers were not closed, but they 
already were (by the merging iterator). fix involves selectively persisting 
these conversion buffers to temp files in the segment write out directory and 
mapping them and tying them to the segment level closer so that they are 
available after the lifetime of the parent merger
    * modify auto column serializers to use segment write out directory for 
temp files instead of java.io.tmpdir
    * fix queryable index projection to not put the time-like column as a 
dimension, instead only adding it as __time
    * use smoosh for temp files so can safely write any Serializer to a temp 
smoosh
---
 .../apache/druid/common/utils/SerializerUtils.java |   1 -
 .../apache/druid/segment/AutoTypeColumnMerger.java |   7 +-
 .../segment/DictionaryEncodedColumnMerger.java     | 163 ++++++++++++-
 .../org/apache/druid/segment/DimensionHandler.java |  44 +++-
 .../apache/druid/segment/DimensionMergerV9.java    |  19 +-
 .../druid/segment/DoubleDimensionHandler.java      |   2 +
 .../druid/segment/FloatDimensionHandler.java       |   2 +
 .../java/org/apache/druid/segment/IndexIO.java     |   1 +
 .../org/apache/druid/segment/IndexMergerV9.java    |  15 ++
 .../apache/druid/segment/LongDimensionHandler.java |   2 +
 .../segment/NestedCommonFormatColumnHandler.java   |   4 +-
 .../druid/segment/NestedDataColumnHandlerV4.java   |   2 +
 .../apache/druid/segment/SimpleQueryableIndex.java |  22 +-
 .../druid/segment/StringDimensionHandler.java      |   3 +
 .../druid/segment/StringDimensionMergerV9.java     |   4 +-
 .../druid/segment/nested/DictionaryIdLookup.java   | 256 ++++++---------------
 .../nested/NestedCommonFormatColumnSerializer.java |  10 +-
 .../segment/nested/NestedDataColumnSerializer.java |  36 +--
 .../nested/NestedDataColumnSerializerV4.java       |   2 +-
 .../nested/ScalarDoubleColumnSerializer.java       |  10 +-
 .../segment/nested/ScalarLongColumnSerializer.java |  10 +-
 .../nested/ScalarStringColumnSerializer.java       |   6 +-
 .../segment/nested/VariantColumnSerializer.java    |  28 +--
 .../druid/segment/serde/ColumnSerializerUtils.java |  33 ++-
 .../org/apache/druid/segment/serde/Serializer.java |   7 +-
 .../apache/druid/segment/IndexMergerTestBase.java  | 235 +++++++++++++++++++
 .../segment/nested/DictionaryIdLookupTest.java     |   7 +-
 .../nested/NestedDataColumnSupplierTest.java       |   2 +-
 .../nested/ScalarDoubleColumnSupplierTest.java     |   2 +-
 .../nested/ScalarLongColumnSupplierTest.java       |   2 +-
 .../nested/ScalarStringColumnSupplierTest.java     |   2 +-
 .../segment/nested/VariantColumnSupplierTest.java  |   2 +-
 32 files changed, 667 insertions(+), 274 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/common/utils/SerializerUtils.java 
b/processing/src/main/java/org/apache/druid/common/utils/SerializerUtils.java
index 73b9d1cd447..78663d49080 100644
--- 
a/processing/src/main/java/org/apache/druid/common/utils/SerializerUtils.java
+++ 
b/processing/src/main/java/org/apache/druid/common/utils/SerializerUtils.java
@@ -33,7 +33,6 @@ import java.nio.channels.WritableByteChannel;
 
 public class SerializerUtils
 {
-
   public <T extends OutputStream> void writeString(T out, String name) throws 
IOException
   {
     byte[] nameBytes = StringUtils.toUtf8(name);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
index c2bba50b3d8..0f8ea4df145 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
@@ -46,6 +46,7 @@ import 
org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteOrder;
 import java.nio.IntBuffer;
@@ -87,6 +88,8 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
   private boolean isVariantType = false;
   private byte variantTypeByte = 0x00;
 
+  private final File segmentBaseDir;
+
   /**
    * @param name                  column name
    * @param outputName            output smoosh file name. if this is a base 
table column, it will be the equivalent to
@@ -105,6 +108,7 @@ public class AutoTypeColumnMerger implements 
DimensionMergerV9
       @Nullable ColumnType castToType,
       IndexSpec indexSpec,
       SegmentWriteOutMedium segmentWriteOutMedium,
+      File segmentBaseDir,
       Closer closer
   )
   {
@@ -114,6 +118,7 @@ public class AutoTypeColumnMerger implements 
DimensionMergerV9
     this.castToType = castToType;
     this.indexSpec = indexSpec;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
+    this.segmentBaseDir = segmentBaseDir;
     this.closer = closer;
   }
 
@@ -265,7 +270,7 @@ public class AutoTypeColumnMerger implements 
DimensionMergerV9
         );
       }
 
-      serializer.openDictionaryWriter();
+      serializer.openDictionaryWriter(segmentBaseDir);
       serializer.serializeFields(mergedFields);
 
       int stringCardinality;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
 
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
index a498e081b2c..6859715cf38 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
@@ -26,10 +26,13 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
+import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.filter.DruidPredicateFactory;
 import org.apache.druid.query.filter.ValueMatcher;
@@ -51,13 +54,21 @@ import 
org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
 import 
org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
 import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
 import org.apache.druid.segment.data.VSizeColumnarMultiIntsSerializer;
+import org.apache.druid.segment.serde.ColumnSerializerUtils;
+import org.apache.druid.segment.serde.Serializer;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.IntBuffer;
+import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
@@ -101,6 +112,16 @@ public abstract class DictionaryEncodedColumnMerger<T 
extends Comparable<T>> imp
   @Nullable
   protected T firstDictionaryValue;
 
+  protected File segmentBaseDir;
+
+  /**
+   * This becomes non-null if {@link #markAsParent()} is called indicating 
that this column is a base table 'parent'
+   * to some projection column, which requires persisting id conversion 
buffers to a temporary files. If there are no
+   * projections defined (or projections which reference this column) then id 
conversion buffers will be freed after
+   * calling {@link #writeIndexes(List)}
+   */
+  @MonotonicNonNull
+  protected PersistedIdConversions persistedIdConversions;
 
   public DictionaryEncodedColumnMerger(
       String dimensionName,
@@ -109,6 +130,7 @@ public abstract class DictionaryEncodedColumnMerger<T 
extends Comparable<T>> imp
       SegmentWriteOutMedium segmentWriteOutMedium,
       ColumnCapabilities capabilities,
       ProgressIndicator progress,
+      File segmentBaseDir,
       Closer closer
   )
   {
@@ -118,8 +140,8 @@ public abstract class DictionaryEncodedColumnMerger<T 
extends Comparable<T>> imp
     this.capabilities = capabilities;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.nullRowsBitmap = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-
     this.progress = progress;
+    this.segmentBaseDir = segmentBaseDir;
     this.closer = closer;
   }
 
@@ -129,6 +151,19 @@ public abstract class DictionaryEncodedColumnMerger<T 
extends Comparable<T>> imp
   @Nullable
   protected abstract T coerceValue(T value);
 
+  @Override
+  public void markAsParent()
+  {
+    final File tmpOutputFilesDir = new File(segmentBaseDir, "tmp_" + 
outputName + "_merger");
+    try {
+      FileUtils.mkdirp(tmpOutputFilesDir);
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    persistedIdConversions = closer.register(new 
PersistedIdConversions(tmpOutputFilesDir));
+  }
+
   @Override
   public void writeMergedValueDictionary(List<IndexableAdapter> adapters) 
throws IOException
   {
@@ -192,7 +227,18 @@ public abstract class DictionaryEncodedColumnMerger<T 
extends Comparable<T>> imp
       writeDictionary(() -> dictionaryMergeIterator);
       for (int i = 0; i < adapters.size(); i++) {
         if (dimValueLookups[i] != null && 
dictionaryMergeIterator.needConversion(i)) {
-          dimConversions.set(i, dictionaryMergeIterator.conversions[i]);
+          final IntBuffer conversionBuffer;
+          if (persistedIdConversions != null) {
+            // if we are a projection parent column, persist the id mapping 
buffer so that child mergers have access
+            // to the mappings during serialization to adjust their dictionary 
ids as needed when serializing
+            conversionBuffer = persistedIdConversions.map(
+                dimensionName + "_idConversions_" + i,
+                dictionaryMergeIterator.conversions[i]
+            );
+          } else {
+            conversionBuffer = dictionaryMergeIterator.conversions[i];
+          }
+          dimConversions.set(i, conversionBuffer);
         }
       }
       cardinality = dictionaryMergeIterator.getCardinality();
@@ -702,4 +748,117 @@ public abstract class DictionaryEncodedColumnMerger<T 
extends Comparable<T>> imp
     void mergeIndexes(int dictId, MutableBitmap mergedIndexes) throws 
IOException;
     void write() throws IOException;
   }
+
+  protected static class IdConversionSerializer implements Serializer
+  {
+    private final IntBuffer buffer;
+    private final ByteBuffer scratch;
+
+    protected IdConversionSerializer(IntBuffer buffer)
+    {
+      this.buffer = buffer.asReadOnlyBuffer();
+      this.buffer.position(0);
+      this.scratch = 
ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.nativeOrder());
+    }
+
+    @Override
+    public long getSerializedSize()
+    {
+      return (long) buffer.capacity() * Integer.BYTES;
+    }
+
+    @Override
+    public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) 
throws IOException
+    {
+      // currently no support for id conversion buffers larger than 2gb
+      buffer.position(0);
+      while (buffer.remaining() > 0) {
+        scratch.position(0);
+        scratch.putInt(buffer.get());
+        scratch.flip();
+        channel.write(scratch);
+      }
+    }
+  }
+
+  /**
+   * Closer of {@link PersistedIdConversion} and a parent path which they are 
stored in for easy cleanup when the
+   * segment is closed.
+   */
+  protected static class PersistedIdConversions implements Closeable
+  {
+    private final File tempDir;
+    private final Closer closer;
+
+    protected PersistedIdConversions(File tempDir)
+    {
+      this.tempDir = tempDir;
+      this.closer = Closer.create();
+    }
+
+    @Nullable
+    public IntBuffer map(String name, IntBuffer intBuffer) throws IOException
+    {
+      final File bufferDir = new File(tempDir, name);
+      FileUtils.mkdirp(bufferDir);
+      final IdConversionSerializer serializer = new 
IdConversionSerializer(intBuffer);
+      return closer.register(new PersistedIdConversion(bufferDir, 
serializer)).getBuffer();
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+      try {
+        closer.close();
+      }
+      finally {
+        FileUtils.deleteDirectory(tempDir);
+      }
+    }
+  }
+
+  /**
+   * Peristent dictionary id conversion mappings, artifacts created during 
segment merge which map old dictionary ids
+   * to new dictionary ids. These persistent mappings are only used when the 
id mapping needs a lifetime longer than
+   * the merge of the column itself, such as when the column being merged is a 
'parent' column of a projection.
+   *
+   * @see DimensionMergerV9#markAsParent()
+   * @see DimensionMergerV9#attachParent(DimensionMergerV9, List)
+   */
+  protected static class PersistedIdConversion implements Closeable
+  {
+    private final File idConversionFile;
+    private final SmooshedFileMapper bufferMapper;
+    private final IntBuffer buffer;
+
+    private boolean isClosed;
+
+    protected PersistedIdConversion(File idConversionDir, Serializer 
idConversionSerializer) throws IOException
+    {
+      this.idConversionFile = idConversionDir;
+      this.bufferMapper = ColumnSerializerUtils.mapSerializer(idConversionDir, 
idConversionSerializer, idConversionDir.getName());
+      final ByteBuffer mappedBuffer = 
bufferMapper.mapFile(idConversionDir.getName());
+      mappedBuffer.order(ByteOrder.nativeOrder());
+      this.buffer = mappedBuffer.asIntBuffer();
+    }
+
+    @Nullable
+    public IntBuffer getBuffer()
+    {
+      if (isClosed) {
+        return null;
+      }
+      return buffer;
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+      if (!isClosed) {
+        isClosed = true;
+        bufferMapper.close();
+        FileUtils.deleteDirectory(idConversionFile);
+      }
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java 
b/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java
index 4ffc7ee04c4..689dfde16cd 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionHandler.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment;
 
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.dimension.DimensionSpec;
@@ -29,6 +30,7 @@ import 
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
+import java.io.File;
 import java.util.Comparator;
 
 /**
@@ -100,6 +102,28 @@ public interface DimensionHandler
    */
   DimensionIndexer<EncodedType, EncodedKeyComponentType, ActualType> 
makeIndexer(boolean useMaxMemoryEstimates);
 
+  /**
+   * @deprecated use {@link #makeMerger(String, IndexSpec, 
SegmentWriteOutMedium, ColumnCapabilities, ProgressIndicator, File, Closer)}
+   *
+   * This method exists for backwards compatiblity with older versions of 
Druid since this is an unofficial extension
+   * point that must be implemented to create custom dimension types, and will 
be removed in a future release.
+   */
+  @Deprecated
+  default DimensionMergerV9 makeMerger(
+      String outputName,
+      IndexSpec indexSpec,
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      ColumnCapabilities capabilities,
+      ProgressIndicator progress,
+      Closer closer
+  )
+  {
+    throw DruidException.defensive(
+        "this method is no longer supported, use makeMerger(String, IndexSpec, 
SegmentWriteOutMedium, ColumnCapabilities, ProgressIndicator, File, Closer) 
instead"
+    );
+  }
+
+
   /**
    * Creates a new DimensionMergerV9, a per-dimension object responsible for 
merging indexes/row data across segments
    * and building the on-disk representation of a dimension. For use with 
IndexMergerV9 only.
@@ -113,16 +137,32 @@ public interface DimensionHandler
    *                              needed
    * @param capabilities          The ColumnCapabilities of the dimension 
represented by this DimensionHandler
    * @param progress              ProgressIndicator used by the merging process
+   * @param segmentBaseDir        segment write out path; temporary files may 
be created here, though should delete
+   *                              after merge is finished OR be registered 
with the Closer parameter
+   * @param closer                Closer tied to segment completion. Anything 
which is not cleaned up inside of the
+   *                              merger after merge is complete should be 
registered with this closer. For example,
+   *                              resources which are required for final 
serialization of the column
    * @return A new DimensionMergerV9 object.
    */
-  DimensionMergerV9 makeMerger(
+  default DimensionMergerV9 makeMerger(
       String outputName,
       IndexSpec indexSpec,
       SegmentWriteOutMedium segmentWriteOutMedium,
       ColumnCapabilities capabilities,
       ProgressIndicator progress,
+      File segmentBaseDir,
       Closer closer
-  );
+  )
+  {
+    return makeMerger(
+        outputName,
+        indexSpec,
+        segmentWriteOutMedium,
+        capabilities,
+        progress,
+        closer
+    );
+  }
 
   /**
    * Given an key component representing a single set of row value(s) for this 
dimension as an Object,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/DimensionMergerV9.java 
b/processing/src/main/java/org/apache/druid/segment/DimensionMergerV9.java
index 5e3f5cad886..26802745ca1 100644
--- a/processing/src/main/java/org/apache/druid/segment/DimensionMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/DimensionMergerV9.java
@@ -39,12 +39,27 @@ public interface DimensionMergerV9 extends DimensionMerger
    */
   ColumnDescriptor makeColumnDescriptor();
 
+  /**
+   * Sets this merger as the "parent" of another merger for a "projection", 
allowing for this merger to preserve any
+   * state which might be required for the projection mergers to do their 
thing. This method MUST be called prior to
+   * performing any merge work. Typically, this method is only implemented if
+   * {@link #attachParent(DimensionMergerV9, List)} requires it.
+   */
+  default void markAsParent()
+  {
+    // do nothing
+  }
+
   /**
    * Attaches the {@link DimensionMergerV9} of a "projection" parent column so 
that stuff like value dictionaries can
-   * be shared between parent and child
+   * be shared between parent and child. This method is called during merging 
instead of {@link #writeMergedValueDictionary(List)} if
+   * the parent column exists.
+   * 
+   * @see IndexMergerV9#makeProjections 
    */
   default void attachParent(DimensionMergerV9 parent, List<IndexableAdapter> 
projectionAdapters) throws IOException
   {
-    // do nothing
+    // by default fall through to writing merged dictionary
+    writeMergedValueDictionary(projectionAdapters);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java 
b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java
index 035384e522b..e8b9b9a2c9c 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionHandler.java
@@ -30,6 +30,7 @@ import 
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import 
org.apache.druid.segment.selector.settable.SettableDoubleColumnValueSelector;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import java.io.File;
 import java.util.Comparator;
 
 public class DoubleDimensionHandler implements DimensionHandler<Double, 
Double, Double>
@@ -82,6 +83,7 @@ public class DoubleDimensionHandler implements 
DimensionHandler<Double, Double,
       SegmentWriteOutMedium segmentWriteOutMedium,
       ColumnCapabilities capabilities,
       ProgressIndicator progress,
+      File segmentBaseDir,
       Closer closer
   )
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java 
b/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java
index 925fefaa9be..0defd19c0a9 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/FloatDimensionHandler.java
@@ -30,6 +30,7 @@ import 
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import 
org.apache.druid.segment.selector.settable.SettableFloatColumnValueSelector;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import java.io.File;
 import java.util.Comparator;
 
 public class FloatDimensionHandler implements DimensionHandler<Float, Float, 
Float>
@@ -82,6 +83,7 @@ public class FloatDimensionHandler implements 
DimensionHandler<Float, Float, Flo
       SegmentWriteOutMedium segmentWriteOutMedium,
       ColumnCapabilities capabilities,
       ProgressIndicator progress,
+      File segmentBaseDir,
       Closer closer
   )
   {
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java 
b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index 9cab1ce1928..8470c63f3e7 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -719,6 +719,7 @@ public class IndexIO
 
         if 
(groupingColumn.equals(projectionSpec.getSchema().getTimeColumnName())) {
           projectionColumns.put(ColumnHolder.TIME_COLUMN_NAME, 
projectionColumns.get(groupingColumn));
+          projectionColumns.remove(groupingColumn);
         }
       }
       for (AggregatorFactory aggregator : 
projectionSpec.getSchema().getAggregators()) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java 
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index 04cc579208d..7da89edaff7 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -227,12 +227,24 @@ public class IndexMergerV9 implements IndexMerger
             segmentWriteOutMedium,
             dimFormats.get(i).toColumnCapabilities(),
             progress,
+            outDir,
             closer
         );
         mergers.add(merger);
         mergersMap.put(mergedDimensions.get(i), merger);
       }
 
+      if (segmentMetadata != null && segmentMetadata.getProjections() != null) 
{
+        for (AggregateProjectionMetadata projectionMetadata : 
segmentMetadata.getProjections()) {
+          for (String dimension : 
projectionMetadata.getSchema().getGroupingColumns()) {
+            DimensionMergerV9 merger = mergersMap.get(dimension);
+            if (merger != null) {
+              merger.markAsParent();
+            }
+          }
+        }
+      }
+
       /************* Setup Dim Conversions **************/
       progress.progress();
       startTime = System.currentTimeMillis();
@@ -302,6 +314,7 @@ public class IndexMergerV9 implements IndexMerger
             indexSpec,
             segmentWriteOutMedium,
             progress,
+            outDir,
             closer,
             mergersMap,
             segmentMetadata
@@ -356,6 +369,7 @@ public class IndexMergerV9 implements IndexMerger
       final IndexSpec indexSpec,
       final SegmentWriteOutMedium segmentWriteOutMedium,
       final ProgressIndicator progress,
+      final File segmentBaseDir,
       final Closer closer,
       final Map<String, DimensionMergerV9> parentMergers,
       final Metadata segmentMetadata
@@ -389,6 +403,7 @@ public class IndexMergerV9 implements IndexMerger
             segmentWriteOutMedium,
             dimensionFormat.toColumnCapabilities(),
             progress,
+            segmentBaseDir,
             closer
         );
         if (parentMergers.containsKey(dimension)) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java 
b/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java
index e93a71be883..928bd8e2b13 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/LongDimensionHandler.java
@@ -30,6 +30,7 @@ import 
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import 
org.apache.druid.segment.selector.settable.SettableLongColumnValueSelector;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import java.io.File;
 import java.util.Comparator;
 
 public class LongDimensionHandler implements DimensionHandler<Long, Long, Long>
@@ -82,6 +83,7 @@ public class LongDimensionHandler implements 
DimensionHandler<Long, Long, Long>
       SegmentWriteOutMedium segmentWriteOutMedium,
       ColumnCapabilities capabilities,
       ProgressIndicator progress,
+      File segmentBaseDir,
       Closer closer
   )
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
 
b/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
index 348d40e4f2d..ab3f9053bf1 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/NestedCommonFormatColumnHandler.java
@@ -31,6 +31,7 @@ import 
org.apache.druid.segment.selector.settable.SettableObjectColumnValueSelec
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
+import java.io.File;
 import java.util.Comparator;
 
 public class NestedCommonFormatColumnHandler implements 
DimensionHandler<StructuredData, StructuredData, StructuredData>
@@ -82,10 +83,11 @@ public class NestedCommonFormatColumnHandler implements 
DimensionHandler<Structu
       SegmentWriteOutMedium segmentWriteOutMedium,
       ColumnCapabilities capabilities,
       ProgressIndicator progress,
+      File segmentBaseDir,
       Closer closer
   )
   {
-    return new AutoTypeColumnMerger(name, outputName, castTo, indexSpec, 
segmentWriteOutMedium, closer);
+    return new AutoTypeColumnMerger(name, outputName, castTo, indexSpec, 
segmentWriteOutMedium, segmentBaseDir, closer);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java
 
b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java
index d74933faaf1..effb74f8c32 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java
@@ -30,6 +30,7 @@ import 
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import 
org.apache.druid.segment.selector.settable.SettableObjectColumnValueSelector;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import java.io.File;
 import java.util.Comparator;
 
 public class NestedDataColumnHandlerV4 implements 
DimensionHandler<StructuredData, StructuredData, StructuredData>
@@ -78,6 +79,7 @@ public class NestedDataColumnHandlerV4 implements 
DimensionHandler<StructuredDat
       SegmentWriteOutMedium segmentWriteOutMedium,
       ColumnCapabilities capabilities,
       ProgressIndicator progress,
+      File segmentBaseDir,
       Closer closer
   )
   {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java 
b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
index 6b48d304b80..ed67d482622 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java
@@ -44,6 +44,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
+import java.util.stream.Collectors;
 
 /**
  *
@@ -239,21 +240,36 @@ public abstract class SimpleQueryableIndex implements 
QueryableIndex
   public QueryableIndex getProjectionQueryableIndex(String name)
   {
     final AggregateProjectionMetadata projectionSpec = 
projectionsMap.get(name);
+    final Metadata projectionMetadata = new Metadata(
+        null,
+        projectionSpec.getSchema().getAggregators(),
+        null,
+        null,
+        true,
+        projectionSpec.getSchema().getOrderingWithTimeColumnSubstitution(),
+        null
+    );
     return new SimpleQueryableIndex(
         dataInterval,
-        new ListIndexed<>(projectionSpec.getSchema().getGroupingColumns()),
+        new ListIndexed<>(
+            projectionSpec.getSchema()
+                          .getGroupingColumns()
+                          .stream()
+                          .filter(x -> 
!x.equals(projectionSpec.getSchema().getTimeColumnName()))
+                          .collect(Collectors.toList())
+        ),
         bitmapFactory,
         projectionColumns.get(name),
         fileMapper,
         true,
-        null,
+        projectionMetadata,
         null
     )
     {
       @Override
       public Metadata getMetadata()
       {
-        return null;
+        return projectionMetadata;
       }
 
       @Override
diff --git 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
index 3bb1e721727..37e62bad64c 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java
@@ -32,6 +32,7 @@ import 
org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import 
org.apache.druid.segment.selector.settable.SettableDimensionValueSelector;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
+import java.io.File;
 import java.util.Collections;
 import java.util.Comparator;
 
@@ -169,6 +170,7 @@ public class StringDimensionHandler implements 
DimensionHandler<Integer, int[],
       SegmentWriteOutMedium segmentWriteOutMedium,
       ColumnCapabilities capabilities,
       ProgressIndicator progress,
+      File segmentBaseDir,
       Closer closer
   )
   {
@@ -188,6 +190,7 @@ public class StringDimensionHandler implements 
DimensionHandler<Integer, int[],
         segmentWriteOutMedium,
         capabilities,
         progress,
+        segmentBaseDir,
         closer
     );
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
index 8c71d418556..b0f9c6f466a 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
@@ -48,6 +48,7 @@ import 
org.apache.druid.segment.serde.DictionaryEncodedColumnPartSerde;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Comparator;
@@ -84,10 +85,11 @@ public class StringDimensionMergerV9 extends 
DictionaryEncodedColumnMerger<Strin
       SegmentWriteOutMedium segmentWriteOutMedium,
       ColumnCapabilities capabilities,
       ProgressIndicator progress,
+      File segmentBaseDir,
       Closer closer
   )
   {
-    super(dimensionName, outputName, indexSpec, segmentWriteOutMedium, 
capabilities, progress, closer);
+    super(dimensionName, outputName, indexSpec, segmentWriteOutMedium, 
capabilities, progress, segmentBaseDir, closer);
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
index 4569df39ef6..c22dfde831f 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/DictionaryIdLookup.java
@@ -19,15 +19,11 @@
 
 package org.apache.druid.segment.nested;
 
-import com.google.common.primitives.Ints;
-import org.apache.druid.annotations.SuppressFBWarnings;
 import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.ByteBufferUtils;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
-import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
 import org.apache.druid.segment.column.StringEncodingStrategies;
 import org.apache.druid.segment.column.TypeStrategies;
 import org.apache.druid.segment.data.DictionaryWriter;
@@ -35,6 +31,7 @@ import org.apache.druid.segment.data.FixedIndexed;
 import org.apache.druid.segment.data.FrontCodedIntArrayIndexed;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.serde.ColumnSerializerUtils;
+import org.apache.druid.utils.CloseableUtils;
 
 import javax.annotation.Nullable;
 import java.io.Closeable;
@@ -42,13 +39,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.GatheringByteChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.EnumSet;
 
 /**
  * Value to dictionary id lookup, backed with memory mapped dictionaries 
populated lazily by the supplied
@@ -56,36 +46,38 @@ import java.util.EnumSet;
  */
 public final class DictionaryIdLookup implements Closeable
 {
+
   private final String name;
-  private final Path tempBasePath;
+  private final File tempBasePath;
 
   @Nullable
   private final DictionaryWriter<String> stringDictionaryWriter;
-  private Path stringDictionaryFile = null;
+  private File stringDictionaryFile = null;
   private SmooshedFileMapper stringBufferMapper = null;
   private Indexed<ByteBuffer> stringDictionary = null;
 
   @Nullable
   private final DictionaryWriter<Long> longDictionaryWriter;
-  private Path longDictionaryFile = null;
-  private MappedByteBuffer longBuffer = null;
+  private File longDictionaryFile = null;
+  private SmooshedFileMapper longBufferMapper = null;
   private FixedIndexed<Long> longDictionary = null;
 
   @Nullable
   private final DictionaryWriter<Double> doubleDictionaryWriter;
-  private Path doubleDictionaryFile = null;
-  MappedByteBuffer doubleBuffer = null;
+  private File doubleDictionaryFile = null;
+  SmooshedFileMapper doubleBufferMapper = null;
   FixedIndexed<Double> doubleDictionary = null;
 
   @Nullable
   private final DictionaryWriter<int[]> arrayDictionaryWriter;
-  private Path arrayDictionaryFile = null;
-  private MappedByteBuffer arrayBuffer = null;
+  private File arrayDictionaryFile = null;
+  private SmooshedFileMapper arrayBufferMapper = null;
   private FrontCodedIntArrayIndexed arrayDictionary = null;
+  private final Closer closer = Closer.create();
 
   public DictionaryIdLookup(
       String name,
-      Path tempBasePath,
+      File tempBaseDir,
       @Nullable DictionaryWriter<String> stringDictionaryWriter,
       @Nullable DictionaryWriter<Long> longDictionaryWriter,
       @Nullable DictionaryWriter<Double> doubleDictionaryWriter,
@@ -93,7 +85,7 @@ public final class DictionaryIdLookup implements Closeable
   )
   {
     this.name = name;
-    this.tempBasePath = tempBasePath;
+    this.tempBasePath = tempBaseDir;
     this.stringDictionaryWriter = stringDictionaryWriter;
     this.longDictionaryWriter = longDictionaryWriter;
     this.doubleDictionaryWriter = doubleDictionaryWriter;
@@ -172,42 +164,27 @@ public final class DictionaryIdLookup implements Closeable
   }
 
   @Nullable
-  public ByteBuffer getLongBuffer()
+  public SmooshedFileMapper getLongBufferMapper()
   {
-    return longBuffer;
+    return longBufferMapper;
   }
 
   @Nullable
-  public ByteBuffer getDoubleBuffer()
+  public SmooshedFileMapper getDoubleBufferMapper()
   {
-    return doubleBuffer;
+    return doubleBufferMapper;
   }
 
   @Nullable
-  public ByteBuffer getArrayBuffer()
+  public SmooshedFileMapper getArrayBufferMapper()
   {
-    return arrayBuffer;
+    return arrayBufferMapper;
   }
 
   @Override
   public void close()
   {
-    if (stringBufferMapper != null) {
-      stringBufferMapper.close();
-      deleteTempFile(stringDictionaryFile);
-    }
-    if (longBuffer != null) {
-      ByteBufferUtils.unmap(longBuffer);
-      deleteTempFile(longDictionaryFile);
-    }
-    if (doubleBuffer != null) {
-      ByteBufferUtils.unmap(doubleBuffer);
-      deleteTempFile(doubleDictionaryFile);
-    }
-    if (arrayBuffer != null) {
-      ByteBufferUtils.unmap(arrayBuffer);
-      deleteTempFile(arrayDictionaryFile);
-    }
+    CloseableUtils.closeAndWrapExceptions(closer);
   }
 
   private int longOffset()
@@ -228,28 +205,16 @@ public final class DictionaryIdLookup implements Closeable
   private void ensureStringDictionaryLoaded()
   {
     if (stringDictionary == null) {
-      // GenericIndexed v2 can write to multiple files if the dictionary is 
larger than 2gb, so we use a smooshfile
-      // for strings because of this. if other type dictionary writers could 
potentially use multiple internal files
-      // in the future, we should transition them to using this approach as 
well (or build a combination smoosher and
-      // mapper so that we can have a mutable smoosh)
-      File stringSmoosh = FileUtils.createTempDirInLocation(tempBasePath, 
StringUtils.urlEncode(name) + "__stringTempSmoosh");
-      stringDictionaryFile = stringSmoosh.toPath();
       final String fileName = ColumnSerializerUtils.getInternalFileName(
           name,
           ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME
       );
+      stringDictionaryFile = makeTempDir(fileName);
+      stringBufferMapper = closer.register(
+          ColumnSerializerUtils.mapSerializer(stringDictionaryFile, 
stringDictionaryWriter, fileName)
+      );
 
-      try (
-          final FileSmoosher smoosher = new FileSmoosher(stringSmoosh);
-          final SmooshedWriter writer = smoosher.addWithSmooshedWriter(
-              fileName,
-              stringDictionaryWriter.getSerializedSize()
-          )
-      ) {
-        stringDictionaryWriter.writeTo(writer, smoosher);
-        writer.close();
-        smoosher.close();
-        stringBufferMapper = SmooshedFileMapper.load(stringSmoosh);
+      try {
         final ByteBuffer stringBuffer = stringBufferMapper.mapFile(fileName);
         stringDictionary = 
StringEncodingStrategies.getStringDictionarySupplier(
             stringBufferMapper,
@@ -266,148 +231,79 @@ public final class DictionaryIdLookup implements 
Closeable
   private void ensureLongDictionaryLoaded()
   {
     if (longDictionary == null) {
-      longDictionaryFile = makeTempFile(name + 
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
-      longBuffer = mapWriter(longDictionaryFile, longDictionaryWriter);
-      longDictionary = FixedIndexed.read(longBuffer, TypeStrategies.LONG, 
ByteOrder.nativeOrder(), Long.BYTES).get();
-      // reset position
-      longBuffer.position(0);
+      final String fileName = ColumnSerializerUtils.getInternalFileName(
+          name,
+          ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
+      );
+      longDictionaryFile = makeTempDir(fileName);
+      longBufferMapper = closer.register(
+          ColumnSerializerUtils.mapSerializer(longDictionaryFile, 
longDictionaryWriter, fileName)
+      );
+      try {
+        final ByteBuffer buffer = longBufferMapper.mapFile(fileName);
+        longDictionary = FixedIndexed.read(buffer, TypeStrategies.LONG, 
ByteOrder.nativeOrder(), Long.BYTES).get();
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
   private void ensureDoubleDictionaryLoaded()
   {
     if (doubleDictionary == null) {
-      doubleDictionaryFile = makeTempFile(name + 
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
-      doubleBuffer = mapWriter(doubleDictionaryFile, doubleDictionaryWriter);
-      doubleDictionary = FixedIndexed.read(
-          doubleBuffer,
-          TypeStrategies.DOUBLE,
-          ByteOrder.nativeOrder(),
-          Double.BYTES
-      ).get();
-      // reset position
-      doubleBuffer.position(0);
+      final String fileName = ColumnSerializerUtils.getInternalFileName(
+          name,
+          ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
+      );
+      doubleDictionaryFile = makeTempDir(fileName);
+      doubleBufferMapper = closer.register(
+          ColumnSerializerUtils.mapSerializer(doubleDictionaryFile, 
doubleDictionaryWriter, fileName)
+      );
+      try {
+        final ByteBuffer buffer = doubleBufferMapper.mapFile(fileName);
+        doubleDictionary = FixedIndexed.read(buffer, TypeStrategies.DOUBLE, 
ByteOrder.nativeOrder(), Long.BYTES).get();
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
   private void ensureArrayDictionaryLoaded()
   {
     if (arrayDictionary == null && arrayDictionaryWriter != null) {
-      arrayDictionaryFile = makeTempFile(name + 
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
-      arrayBuffer = mapWriter(arrayDictionaryFile, arrayDictionaryWriter);
-      arrayDictionary = FrontCodedIntArrayIndexed.read(arrayBuffer, 
ByteOrder.nativeOrder()).get();
-      // reset position
-      arrayBuffer.position(0);
-    }
-  }
-
-  private Path makeTempFile(String name)
-  {
-    try {
-      return Files.createTempFile(tempBasePath, StringUtils.urlEncode(name), 
null);
-    }
-    catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void deleteTempFile(Path path)
-  {
-    try {
-      final File file = path.toFile();
-      if (file.isDirectory()) {
-        FileUtils.deleteDirectory(file);
-      } else {
-        Files.delete(path);
+      final String fileName = ColumnSerializerUtils.getInternalFileName(
+          name,
+          ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
+      );
+      arrayDictionaryFile = makeTempDir(fileName);
+      arrayBufferMapper = closer.register(
+          ColumnSerializerUtils.mapSerializer(arrayDictionaryFile, 
arrayDictionaryWriter, fileName)
+      );
+      try {
+        final ByteBuffer buffer = arrayBufferMapper.mapFile(fileName);
+        arrayDictionary = FrontCodedIntArrayIndexed.read(buffer, 
ByteOrder.nativeOrder()).get();
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
       }
-    }
-    catch (IOException e) {
-      throw new RuntimeException(e);
     }
   }
 
-  @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
-  private MappedByteBuffer mapWriter(Path path, DictionaryWriter<?> writer)
+  private File makeTempDir(String fileName)
   {
-    final EnumSet<StandardOpenOption> options = EnumSet.of(
-        StandardOpenOption.READ,
-        StandardOpenOption.WRITE,
-        StandardOpenOption.CREATE,
-        StandardOpenOption.TRUNCATE_EXISTING
-    );
-
-    try (FileChannel fileChannel = FileChannel.open(path, options);
-         GatheringByteChannel smooshChannel = makeWriter(fileChannel, 
writer.getSerializedSize())) {
-      //noinspection DataFlowIssue
-      writer.writeTo(smooshChannel, null);
-      return fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 
writer.getSerializedSize());
+    try {
+      final File f = new File(tempBasePath, StringUtils.urlEncode(fileName));
+      FileUtils.mkdirp(f);
+      closer.register(() -> FileUtils.deleteDirectory(f));
+      return f;
     }
     catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
 
-  private GatheringByteChannel makeWriter(FileChannel channel, long size)
-  {
-    // basically same code as smooshed writer, can't use channel directly 
because copying between channels
-    // doesn't handle size of source channel correctly
-    return new GatheringByteChannel()
-    {
-      private boolean isClosed = false;
-      private long currOffset = 0;
-
-      @Override
-      public boolean isOpen()
-      {
-        return !isClosed;
-      }
-
-      @Override
-      public void close() throws IOException
-      {
-        channel.close();
-        isClosed = true;
-      }
-
-      public int bytesLeft()
-      {
-        return (int) (size - currOffset);
-      }
-
-      @Override
-      public int write(ByteBuffer buffer) throws IOException
-      {
-        return addToOffset(channel.write(buffer));
-      }
-
-      @Override
-      public long write(ByteBuffer[] srcs, int offset, int length) throws 
IOException
-      {
-        return addToOffset(channel.write(srcs, offset, length));
-      }
-
-      @Override
-      public long write(ByteBuffer[] srcs) throws IOException
-      {
-        return addToOffset(channel.write(srcs));
-      }
-
-      public int addToOffset(long numBytesWritten)
-      {
-        if (numBytesWritten > bytesLeft()) {
-          throw DruidException.defensive(
-              "Wrote more bytes[%,d] than available[%,d]. Don't do that.",
-              numBytesWritten,
-              bytesLeft()
-          );
-        }
-        currOffset += numBytesWritten;
-
-        return Ints.checkedCast(numBytesWritten);
-      }
-    };
-  }
-
   public int getStringCardinality()
   {
     ensureStringDictionaryLoaded();
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
index 50bca997735..32bf670b003 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java
@@ -21,11 +21,13 @@ package org.apache.druid.segment.nested;
 
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.segment.GenericColumnSerializer;
 import org.apache.druid.segment.data.VByte;
 import org.apache.druid.segment.serde.ColumnSerializerUtils;
 import org.apache.druid.segment.serde.Serializer;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -53,7 +55,7 @@ public abstract class NestedCommonFormatColumnSerializer 
implements GenericColum
   public static final String RAW_FILE_NAME = "__raw";
   public static final String NESTED_FIELD_PREFIX = "__field_";
 
-  public abstract void openDictionaryWriter() throws IOException;
+  public abstract void openDictionaryWriter(File segmentBaseDir) throws 
IOException;
 
   public void serializeFields(SortedMap<String, FieldTypeInfo.MutableTypeSet> 
fields) throws IOException
   {
@@ -80,9 +82,11 @@ public abstract class NestedCommonFormatColumnSerializer 
implements GenericColum
     ColumnSerializerUtils.writeInternal(smoosher, serializer, getColumnName(), 
fileName);
   }
 
-  protected void writeInternal(FileSmoosher smoosher, ByteBuffer buffer, 
String fileName) throws IOException
+  protected void copyFromTempSmoosh(FileSmoosher smoosher, SmooshedFileMapper 
fileMapper) throws IOException
   {
-    ColumnSerializerUtils.writeInternal(smoosher, buffer, getColumnName(), 
fileName);
+    for (String internalName : fileMapper.getInternalFilenames()) {
+      smoosher.add(internalName, fileMapper.mapFile(internalName));
+    }
   }
 
   protected void writeV0Header(WritableByteChannel channel, ByteBuffer 
columnNameBuffer) throws IOException
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 6a3405e58fc..f7c2cbc1b59 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -24,12 +24,10 @@ import com.google.common.collect.Maps;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.math.expr.ExprEval;
@@ -51,6 +49,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -201,7 +200,7 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
   }
 
   @Override
-  public void openDictionaryWriter() throws IOException
+  public void openDictionaryWriter(File segmentBaseDir) throws IOException
   {
     fieldsWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, name, 
GenericIndexed.STRING_STRATEGY);
     fieldsWriter.open();
@@ -243,7 +242,7 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
     globalDictionaryIdLookup = closer.register(
         new DictionaryIdLookup(
             name,
-            FileUtils.getTempDir(),
+            segmentBaseDir,
             dictionaryWriter,
             longDictionaryWriter,
             doubleDictionaryWriter,
@@ -440,37 +439,22 @@ public class NestedDataColumnSerializer extends 
NestedCommonFormatColumnSerializ
 
     if (writeDictionary) {
       if (globalDictionaryIdLookup.getStringBufferMapper() != null) {
-        SmooshedFileMapper fileMapper = 
globalDictionaryIdLookup.getStringBufferMapper();
-        for (String internalName : fileMapper.getInternalFilenames()) {
-          smoosher.add(internalName, fileMapper.mapFile(internalName));
-        }
+        copyFromTempSmoosh(smoosher, 
globalDictionaryIdLookup.getStringBufferMapper());
       } else {
         writeInternal(smoosher, dictionaryWriter, 
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
       }
-      if (globalDictionaryIdLookup.getLongBuffer() != null) {
-        writeInternal(
-            smoosher,
-            globalDictionaryIdLookup.getLongBuffer(),
-            ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME
-        );
+      if (globalDictionaryIdLookup.getLongBufferMapper() != null) {
+        copyFromTempSmoosh(smoosher, 
globalDictionaryIdLookup.getLongBufferMapper());
       } else {
         writeInternal(smoosher, longDictionaryWriter, 
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
       }
-      if (globalDictionaryIdLookup.getDoubleBuffer() != null) {
-        writeInternal(
-            smoosher,
-            globalDictionaryIdLookup.getDoubleBuffer(),
-            ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
-        );
+      if (globalDictionaryIdLookup.getDoubleBufferMapper() != null) {
+        copyFromTempSmoosh(smoosher, 
globalDictionaryIdLookup.getDoubleBufferMapper());
       } else {
         writeInternal(smoosher, doubleDictionaryWriter, 
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
       }
-      if (globalDictionaryIdLookup.getArrayBuffer() != null) {
-        writeInternal(
-            smoosher,
-            globalDictionaryIdLookup.getArrayBuffer(),
-            ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME
-        );
+      if (globalDictionaryIdLookup.getArrayBufferMapper() != null) {
+        copyFromTempSmoosh(smoosher, 
globalDictionaryIdLookup.getArrayBufferMapper());
       } else {
         writeInternal(smoosher, arrayDictionaryWriter, 
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
       }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java
index bb015ecce92..87240f3fadc 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java
@@ -200,7 +200,7 @@ public class NestedDataColumnSerializerV4 implements 
GenericColumnSerializer<Str
     globalDictionaryIdLookup = closer.register(
         new DictionaryIdLookup(
             name,
-            FileUtils.getTempDir(),
+            FileUtils.getTempDir().toFile(),
             dictionaryWriter,
             longDictionaryWriter,
             doubleDictionaryWriter,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
index 050848f6503..52d798149fd 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.segment.nested;
 
-import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
@@ -35,6 +34,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteOrder;
 
@@ -66,7 +66,7 @@ public class ScalarDoubleColumnSerializer extends 
ScalarNestedCommonFormatColumn
   }
 
   @Override
-  public void openDictionaryWriter() throws IOException
+  public void openDictionaryWriter(File segmentBaseDir) throws IOException
   {
     dictionaryWriter = new FixedIndexedWriter<>(
         segmentWriteOutMedium,
@@ -79,7 +79,7 @@ public class ScalarDoubleColumnSerializer extends 
ScalarNestedCommonFormatColumn
     dictionaryIdLookup = closer.register(
         new DictionaryIdLookup(
             name,
-            FileUtils.getTempDir(),
+            segmentBaseDir,
             null,
             null,
             dictionaryWriter,
@@ -136,8 +136,8 @@ public class ScalarDoubleColumnSerializer extends 
ScalarNestedCommonFormatColumn
   @Override
   protected void writeDictionaryFile(FileSmoosher smoosher) throws IOException
   {
-    if (dictionaryIdLookup.getDoubleBuffer() != null) {
-      writeInternal(smoosher, dictionaryIdLookup.getDoubleBuffer(), 
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
+    if (dictionaryIdLookup.getDoubleBufferMapper() != null) {
+      copyFromTempSmoosh(smoosher, dictionaryIdLookup.getDoubleBufferMapper());
     } else {
       writeInternal(smoosher, dictionaryWriter, 
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
     }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
index 89408b20388..e62c1ebe8aa 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.segment.nested;
 
-import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
@@ -35,6 +34,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteOrder;
 
@@ -67,7 +67,7 @@ public class ScalarLongColumnSerializer extends 
ScalarNestedCommonFormatColumnSe
   }
 
   @Override
-  public void openDictionaryWriter() throws IOException
+  public void openDictionaryWriter(File segmentBaseDir) throws IOException
   {
     dictionaryWriter = new FixedIndexedWriter<>(
         segmentWriteOutMedium,
@@ -80,7 +80,7 @@ public class ScalarLongColumnSerializer extends 
ScalarNestedCommonFormatColumnSe
     dictionaryIdLookup = closer.register(
         new DictionaryIdLookup(
             name,
-            FileUtils.getTempDir(),
+            segmentBaseDir,
             null,
             dictionaryWriter,
             null,
@@ -136,8 +136,8 @@ public class ScalarLongColumnSerializer extends 
ScalarNestedCommonFormatColumnSe
   @Override
   protected void writeDictionaryFile(FileSmoosher smoosher) throws IOException
   {
-    if (dictionaryIdLookup.getLongBuffer() != null) {
-      writeInternal(smoosher, dictionaryIdLookup.getLongBuffer(), 
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
+    if (dictionaryIdLookup.getLongBufferMapper() != null) {
+      copyFromTempSmoosh(smoosher, dictionaryIdLookup.getLongBufferMapper());
     } else {
       writeInternal(smoosher, dictionaryWriter, 
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
     }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
index 230a9433cdc..5a335a312b4 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
@@ -20,7 +20,6 @@
 package org.apache.druid.segment.nested;
 
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
@@ -34,6 +33,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
 
 /**
@@ -62,7 +62,7 @@ public class ScalarStringColumnSerializer extends 
ScalarNestedCommonFormatColumn
   }
 
   @Override
-  public void openDictionaryWriter() throws IOException
+  public void openDictionaryWriter(File segmentBaseDir) throws IOException
   {
     dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter(
         indexSpec.getStringDictionaryEncoding(),
@@ -73,7 +73,7 @@ public class ScalarStringColumnSerializer extends 
ScalarNestedCommonFormatColumn
     dictionaryIdLookup = closer.register(
         new DictionaryIdLookup(
             name,
-            FileUtils.getTempDir(),
+            segmentBaseDir,
             dictionaryWriter,
             null,
             null,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
index 0d07822db54..9adb8fac2b7 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
@@ -26,13 +26,11 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.math.expr.ExprEval;
 import org.apache.druid.math.expr.ExpressionType;
@@ -52,6 +50,7 @@ import org.apache.druid.segment.serde.ColumnSerializerUtils;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -133,7 +132,7 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
   }
 
   @Override
-  public void openDictionaryWriter() throws IOException
+  public void openDictionaryWriter(File segmentBaseDir) throws IOException
   {
     dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter(
         indexSpec.getStringDictionaryEncoding(),
@@ -171,7 +170,7 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
     dictionaryIdLookup = closer.register(
         new DictionaryIdLookup(
             name,
-            FileUtils.getTempDir(),
+            segmentBaseDir,
             dictionaryWriter,
             longDictionaryWriter,
             doubleDictionaryWriter,
@@ -423,29 +422,22 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
 
     if (writeDictionary) {
       if (dictionaryIdLookup.getStringBufferMapper() != null) {
-        SmooshedFileMapper fileMapper = 
dictionaryIdLookup.getStringBufferMapper();
-        for (String internalName : fileMapper.getInternalFilenames()) {
-          smoosher.add(internalName, fileMapper.mapFile(internalName));
-        }
+        copyFromTempSmoosh(smoosher, 
dictionaryIdLookup.getStringBufferMapper());
       } else {
         writeInternal(smoosher, dictionaryWriter, 
ColumnSerializerUtils.STRING_DICTIONARY_FILE_NAME);
       }
-      if (dictionaryIdLookup.getLongBuffer() != null) {
-        writeInternal(smoosher, dictionaryIdLookup.getLongBuffer(), 
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
+      if (dictionaryIdLookup.getLongBufferMapper() != null) {
+        copyFromTempSmoosh(smoosher, dictionaryIdLookup.getLongBufferMapper());
       } else {
         writeInternal(smoosher, longDictionaryWriter, 
ColumnSerializerUtils.LONG_DICTIONARY_FILE_NAME);
       }
-      if (dictionaryIdLookup.getDoubleBuffer() != null) {
-        writeInternal(
-            smoosher,
-            dictionaryIdLookup.getDoubleBuffer(),
-            ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME
-        );
+      if (dictionaryIdLookup.getDoubleBufferMapper() != null) {
+        copyFromTempSmoosh(smoosher, 
dictionaryIdLookup.getDoubleBufferMapper());
       } else {
         writeInternal(smoosher, doubleDictionaryWriter, 
ColumnSerializerUtils.DOUBLE_DICTIONARY_FILE_NAME);
       }
-      if (dictionaryIdLookup.getArrayBuffer() != null) {
-        writeInternal(smoosher, dictionaryIdLookup.getArrayBuffer(), 
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
+      if (dictionaryIdLookup.getArrayBufferMapper() != null) {
+        copyFromTempSmoosh(smoosher, 
dictionaryIdLookup.getArrayBufferMapper());
       } else {
         writeInternal(smoosher, arrayDictionaryWriter, 
ColumnSerializerUtils.ARRAY_DICTIONARY_FILE_NAME);
       }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
 
b/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
index 8396f07fd35..542441bb00b 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/serde/ColumnSerializerUtils.java
@@ -25,10 +25,11 @@ import 
com.fasterxml.jackson.dataformat.smile.SmileGenerator;
 import org.apache.druid.guice.BuiltInTypesModule;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
 
+import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 public class ColumnSerializerUtils
 {
@@ -65,17 +66,31 @@ public class ColumnSerializerUtils
     }
   }
 
-  public static void writeInternal(FileSmoosher smoosher, ByteBuffer buffer, 
String columnName, String fileName)
-      throws IOException
+  public static String getInternalFileName(String fileNameBase, String field)
   {
-    final String internalName = getInternalFileName(columnName, fileName);
-    try (SmooshedWriter smooshChannel = 
smoosher.addWithSmooshedWriter(internalName, buffer.capacity())) {
-      smooshChannel.write(buffer);
-    }
+    return fileNameBase + "." + field;
   }
 
-  public static String getInternalFileName(String fileNameBase, String field)
+  /**
+   * Writes a {@link Serializer} to a 'smoosh file' which contains the 
contents of this single serializer, with the
+   * serializer writing to an internal file specified by the name argument, 
returning a {@link SmooshedFileMapper}
+   */
+  public static SmooshedFileMapper mapSerializer(File smooshFile, Serializer 
writer, String name)
   {
-    return fileNameBase + "." + field;
+    try (
+        final FileSmoosher smoosher = new FileSmoosher(smooshFile);
+        final SmooshedWriter smooshedWriter = smoosher.addWithSmooshedWriter(
+            name,
+            writer.getSerializedSize()
+        )
+    ) {
+      writer.writeTo(smooshedWriter, smoosher);
+      smooshedWriter.close();
+      smoosher.close();
+      return SmooshedFileMapper.load(smooshFile);
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/serde/Serializer.java 
b/processing/src/main/java/org/apache/druid/segment/serde/Serializer.java
index 607d17195cd..b6889b148c2 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/Serializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/Serializer.java
@@ -36,8 +36,11 @@ public interface Serializer
   long getSerializedSize() throws IOException;
 
   /**
-   * Writes serialized form of this object to the given channel. If parallel 
data streams are needed, they could be
-   * created with the provided smoosher.
+   * Writes the serialized form of this object. The entire object may be 
written to the provided channel, or the object
+   * may be split over the provided channel and files added to the {@link 
FileSmoosher], where additional channels can
+   * be created via {@link FileSmoosher#addWithSmooshedWriter(String, long)}. 
The latter approach is useful when the
+   * serialized form of the object is too large for a single smoosh container. 
At the time this javadoc was written,
+   * the max smoosh container size is limit to the max {@link 
java.nio.ByteBuffer} size.
    */
   void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws 
IOException;
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
index 7faa31d8e53..a6893756496 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
@@ -30,7 +30,9 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.ListBasedInputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -39,17 +41,23 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.frame.testutil.FrameTestUtil;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
+import org.apache.druid.query.BitmapResultFactory;
 import org.apache.druid.query.DefaultBitmapResultFactory;
 import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.DictionaryEncodedColumn;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
 import org.apache.druid.segment.data.BitmapSerdeFactory;
 import org.apache.druid.segment.data.BitmapValues;
@@ -63,9 +71,11 @@ import 
org.apache.druid.segment.incremental.IncrementalIndexAdapter;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueIndexes;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -143,6 +153,7 @@ public class IndexMergerTestBase extends 
InitializedNullHandlingTest
   private final IndexSpec indexSpec;
   private final IndexIO indexIO;
   private final boolean useBitmapIndexes;
+  private final BitmapSerdeFactory serdeFactory;
 
   @Rule
   public final CloserRule closer = new CloserRule(false);
@@ -163,6 +174,7 @@ public class IndexMergerTestBase extends 
InitializedNullHandlingTest
                               .withLongEncoding(longEncodingStrategy)
                               .build();
     this.indexIO = TestHelper.getTestIndexIO();
+    this.serdeFactory = bitmapSerdeFactory;
     this.useBitmapIndexes = bitmapSerdeFactory != null;
   }
 
@@ -3031,6 +3043,229 @@ public class IndexMergerTestBase extends 
InitializedNullHandlingTest
     validateTestMaxColumnsToMergeOutputSegment(merged7);
   }
 
+  @Test
+  public void testMergeProjections() throws IOException
+  {
+    File tmp = FileUtils.createTempDir();
+    closer.closeLater(tmp::delete);
+
+    final DateTime timestamp = 
Granularities.DAY.bucket(DateTimes.nowUtc()).getStart();
+
+    final RowSignature rowSignature = RowSignature.builder()
+                                                  .add("a", ColumnType.STRING)
+                                                  .add("b", ColumnType.STRING)
+                                                  .add("c", ColumnType.LONG)
+                                                  .build();
+
+    final List<InputRow> rows1 = Arrays.asList(
+        new ListBasedInputRow(
+            rowSignature,
+            timestamp,
+            rowSignature.getColumnNames(),
+            Arrays.asList("a", "x", 1L)
+        ),
+        new ListBasedInputRow(
+            rowSignature,
+            timestamp.plusMinutes(1),
+            rowSignature.getColumnNames(),
+            Arrays.asList("b", "y", 2L)
+        ),
+        new ListBasedInputRow(
+            rowSignature,
+            timestamp.plusHours(2),
+            rowSignature.getColumnNames(),
+            Arrays.asList("a", "z", 3L)
+        )
+    );
+
+    final List<InputRow> rows2 = Arrays.asList(
+        new ListBasedInputRow(
+            rowSignature,
+            timestamp,
+            rowSignature.getColumnNames(),
+            Arrays.asList("b", "y", 1L)
+        ),
+        new ListBasedInputRow(
+            rowSignature,
+            timestamp.plusMinutes(1),
+            rowSignature.getColumnNames(),
+            Arrays.asList("d", "w", 2L)
+        ),
+        new ListBasedInputRow(
+            rowSignature,
+            timestamp.plusHours(2),
+            rowSignature.getColumnNames(),
+            Arrays.asList("b", "z", 3L)
+        )
+    );
+
+    final DimensionsSpec.Builder dimensionsBuilder =
+        DimensionsSpec.builder()
+                      .setDimensions(
+                          Arrays.asList(
+                              new StringDimensionSchema("a"),
+                              new StringDimensionSchema("b")
+                          )
+                      );
+
+    List<AggregateProjectionSpec> projections = Arrays.asList(
+        new AggregateProjectionSpec(
+            "a_hourly_c_sum",
+            VirtualColumns.create(
+                Granularities.toVirtualColumn(Granularities.HOUR, "__gran")
+            ),
+            Arrays.asList(
+                new StringDimensionSchema("a"),
+                new LongDimensionSchema("__gran")
+            ),
+            new AggregatorFactory[]{
+                new LongSumAggregatorFactory("c_sum", "c")
+            }
+        ),
+        new AggregateProjectionSpec(
+            "a_c_sum",
+            VirtualColumns.EMPTY,
+            Collections.singletonList(
+                new StringDimensionSchema("a")
+            ),
+            new AggregatorFactory[]{
+                new LongSumAggregatorFactory("c_sum", "c")
+            }
+        )
+    );
+
+    IndexBuilder bob = IndexBuilder.create()
+                                   .tmpDir(tmp)
+                                   .schema(
+                                       IncrementalIndexSchema.builder()
+                                                             
.withDimensionsSpec(dimensionsBuilder.build())
+                                                             .withRollup(false)
+                                                             
.withProjections(projections)
+                                                             .build()
+                                   )
+                                   .rows(rows1);
+
+    IndexBuilder bob2 = IndexBuilder.create()
+                                    .tmpDir(tmp)
+                                    .schema(
+                                        IncrementalIndexSchema.builder()
+                                                              
.withDimensionsSpec(dimensionsBuilder.build())
+                                                              
.withRollup(false)
+                                                              
.withProjections(projections)
+                                                              .build()
+                                    )
+                                    .rows(rows2);
+
+    QueryableIndex q1 = bob.buildMMappedIndex();
+    QueryableIndex q2 = bob2.buildMMappedIndex();
+
+    QueryableIndex merged = indexIO.loadIndex(
+        indexMerger.merge(
+            Arrays.asList(
+                new QueryableIndexIndexableAdapter(q1),
+                new QueryableIndexIndexableAdapter(q2)
+            ),
+            true,
+            new AggregatorFactory[0],
+            temporaryFolder.newFolder(),
+            dimensionsBuilder.build(),
+            IndexSpec.DEFAULT,
+            -1
+        )
+    );
+
+    CursorBuildSpec p1Spec = CursorBuildSpec.builder()
+                                            .setQueryContext(
+                                                QueryContext.of(
+                                                    
ImmutableMap.of(QueryContexts.USE_PROJECTION, "a_hourly_c_sum")
+                                                )
+                                            )
+                                            .setVirtualColumns(
+                                                VirtualColumns.create(
+                                                    
Granularities.toVirtualColumn(Granularities.HOUR, "gran")
+                                                )
+                                            )
+                                            .setAggregators(
+                                                Collections.singletonList(
+                                                    new 
LongSumAggregatorFactory("c", "c")
+                                                )
+                                            )
+                                            
.setGroupingColumns(Collections.singletonList("a"))
+                                            .build();
+    CursorBuildSpec p2Spec = CursorBuildSpec.builder()
+                                            .setQueryContext(
+                                                QueryContext.of(
+                                                    
ImmutableMap.of(QueryContexts.USE_PROJECTION, "a_c_sum")
+                                                )
+                                            )
+                                            .setAggregators(
+                                                Collections.singletonList(
+                                                    new 
LongSumAggregatorFactory("c", "c")
+                                                )
+                                            )
+                                            
.setGroupingColumns(Collections.singletonList("a"))
+                                            .build();
+
+
+    QueryableIndexCursorFactory cursorFactory = new 
QueryableIndexCursorFactory(merged);
+
+    try (final CursorHolder cursorHolder = 
cursorFactory.makeCursorHolder(p1Spec)) {
+      final Cursor cursor = cursorHolder.asCursor();
+      int rowCount = 0;
+      while (!cursor.isDone()) {
+        rowCount++;
+        cursor.advance();
+      }
+      Assert.assertEquals(5, rowCount);
+    }
+
+    try (final CursorHolder cursorHolder = 
cursorFactory.makeCursorHolder(p2Spec)) {
+      final Cursor cursor = cursorHolder.asCursor();
+      int rowCount = 0;
+      while (!cursor.isDone()) {
+        rowCount++;
+        cursor.advance();
+      }
+      Assert.assertEquals(3, rowCount);
+    }
+
+    QueryableIndex p1Index = 
merged.getProjectionQueryableIndex("a_hourly_c_sum");
+    Assert.assertNotNull(p1Index);
+    ColumnHolder aHolder = p1Index.getColumnHolder("a");
+    DictionaryEncodedColumn aCol = (DictionaryEncodedColumn) 
aHolder.getColumn();
+    Assert.assertEquals(3, aCol.getCardinality());
+
+    QueryableIndex p2Index = merged.getProjectionQueryableIndex("a_c_sum");
+    Assert.assertNotNull(p2Index);
+    ColumnHolder aHolder2 = p2Index.getColumnHolder("a");
+    DictionaryEncodedColumn aCol2 = (DictionaryEncodedColumn) 
aHolder2.getColumn();
+    Assert.assertEquals(3, aCol2.getCardinality());
+
+    if (serdeFactory != null) {
+
+      BitmapResultFactory resultFactory = new 
DefaultBitmapResultFactory(serdeFactory.getBitmapFactory());
+
+      Assert.assertEquals(
+          2,
+          resultFactory.toImmutableBitmap(
+              aHolder.getIndexSupplier()
+                     .as(ValueIndexes.class)
+                     .forValue("a", ColumnType.STRING)
+                     .computeBitmapResult(resultFactory, false)
+          ).size()
+      );
+
+      Assert.assertEquals(
+          1,
+          resultFactory.toImmutableBitmap(
+              aHolder2.getIndexSupplier()
+                      .as(ValueIndexes.class)
+                      .forValue("a", ColumnType.STRING)
+                      .computeBitmapResult(resultFactory, false)
+          ).size()
+      );
+    }
+  }
 
   private QueryableIndex persistAndLoad(List<DimensionSchema> schema, 
InputRow... rows) throws IOException
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/DictionaryIdLookupTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/DictionaryIdLookupTest.java
index bba9a639803..d12ffc07f74 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/DictionaryIdLookupTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/DictionaryIdLookupTest.java
@@ -38,7 +38,6 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteOrder;
-import java.nio.file.Path;
 
 public class DictionaryIdLookupTest extends InitializedNullHandlingTest
 {
@@ -92,12 +91,12 @@ public class DictionaryIdLookupTest extends 
InitializedNullHandlingTest
         4
     );
 
-    Path dictTempPath = temp.newFolder().toPath();
+    File dictTempDir = temp.newFolder();
 
     // make lookup with references to writers
     DictionaryIdLookup idLookup = new DictionaryIdLookup(
         "test",
-        dictTempPath,
+        dictTempDir,
         stringWriter,
         longWriter,
         doubleWriter,
@@ -110,7 +109,7 @@ public class DictionaryIdLookupTest extends 
InitializedNullHandlingTest
     doubleWriter.open();
     arrayWriter.open();
 
-    File tempDir = dictTempPath.toFile();
+    File tempDir = dictTempDir;
     Assert.assertEquals(0, tempDir.listFiles().length);
 
     for (String s : sortedValueDictionary.getSortedStrings()) {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
index 64ba2679f04..87e457ef54e 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java
@@ -197,7 +197,7 @@ public class NestedDataColumnSupplierTest extends 
InitializedNullHandlingTest
       SortedValueDictionary globalDictionarySortedCollector = 
mergable.getValueDictionary();
       mergable.mergeFieldsInto(sortedFields);
 
-      serializer.openDictionaryWriter();
+      serializer.openDictionaryWriter(tempFolder.newFolder());
       serializer.serializeFields(sortedFields);
       serializer.serializeDictionaries(
           globalDictionarySortedCollector.getSortedStrings(),
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
index f483e297be0..ccc425b1700 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
@@ -150,7 +150,7 @@ public class ScalarDoubleColumnSupplierTest extends 
InitializedNullHandlingTest
       SortedValueDictionary globalDictionarySortedCollector = 
mergable.getValueDictionary();
       mergable.mergeFieldsInto(sortedFields);
 
-      serializer.openDictionaryWriter();
+      serializer.openDictionaryWriter(tempFolder.newFolder());
       serializer.serializeDictionaries(
           globalDictionarySortedCollector.getSortedStrings(),
           globalDictionarySortedCollector.getSortedLongs(),
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
index c8830f3aefd..2d40b2dd58f 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
@@ -150,7 +150,7 @@ public class ScalarLongColumnSupplierTest extends 
InitializedNullHandlingTest
       SortedValueDictionary globalDictionarySortedCollector = 
mergable.getValueDictionary();
       mergable.mergeFieldsInto(sortedFields);
 
-      serializer.openDictionaryWriter();
+      serializer.openDictionaryWriter(tempFolder.newFolder());
       serializer.serializeDictionaries(
           globalDictionarySortedCollector.getSortedStrings(),
           globalDictionarySortedCollector.getSortedLongs(),
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
index d72970b3b12..110e5d68b83 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
@@ -150,7 +150,7 @@ public class ScalarStringColumnSupplierTest extends 
InitializedNullHandlingTest
       SortedValueDictionary globalDictionarySortedCollector = 
mergable.getValueDictionary();
       mergable.mergeFieldsInto(sortedFields);
 
-      serializer.openDictionaryWriter();
+      serializer.openDictionaryWriter(tempFolder.newFolder());
       serializer.serializeDictionaries(
           globalDictionarySortedCollector.getSortedStrings(),
           globalDictionarySortedCollector.getSortedLongs(),
diff --git 
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
 
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
index 6aea2ace234..6ab748d6111 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
@@ -275,7 +275,7 @@ public class VariantColumnSupplierTest extends 
InitializedNullHandlingTest
           closer
       );
 
-      serializer.openDictionaryWriter();
+      serializer.openDictionaryWriter(tempFolder.newFolder());
       serializer.serializeDictionaries(
           globalDictionarySortedCollector.getSortedStrings(),
           globalDictionarySortedCollector.getSortedLongs(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to