This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 313da988794 decouple column serializer compression closers from 
SegmentWriteoutMedium to optionally allow serializers to release direct memory 
allocated for compression earlier than when segment is completed (#16076)
313da988794 is described below

commit 313da988794d81b94ebbbe5bc3ba8b5e3c4fd6c7
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Mar 11 12:28:04 2024 -0700

    decouple column serializer compression closers from SegmentWriteoutMedium 
to optionally allow serializers to release direct memory allocated for 
compression earlier than when segment is completed (#16076)
---
 .../compression/BaseColumnarLongsBenchmark.java    | 18 ++++++++----
 .../FloatCompressionBenchmarkFileGenerator.java    |  7 +++--
 .../LongCompressionBenchmarkFileGenerator.java     |  7 +++--
 .../CompressedBigDecimalLongColumnSerializer.java  | 11 +++++--
 .../segment/DictionaryEncodedColumnMerger.java     |  3 +-
 .../druid/segment/DoubleColumnSerializer.java      |  3 +-
 .../druid/segment/DoubleColumnSerializerV2.java    |  3 +-
 .../druid/segment/FloatColumnSerializer.java       |  3 +-
 .../druid/segment/FloatColumnSerializerV2.java     |  3 +-
 .../org/apache/druid/segment/IndexMergerV9.java    |  1 -
 .../apache/druid/segment/LongColumnSerializer.java |  3 +-
 .../druid/segment/LongColumnSerializerV2.java      |  3 +-
 .../data/BlockLayoutColumnarDoublesSerializer.java |  7 +++--
 .../data/BlockLayoutColumnarFloatsSerializer.java  |  7 +++--
 .../data/BlockLayoutColumnarLongsSerializer.java   | 15 +++++++---
 .../segment/data/CompressedBlockSerializer.java    | 11 ++++---
 .../data/CompressedColumnarIntsSerializer.java     | 15 +++++-----
 .../segment/data/CompressedLongsSerializer.java    | 10 +++++--
 .../CompressedVSizeColumnarIntsSerializer.java     | 24 ++++++++-------
 ...ompressedVariableSizedBlobColumnSerializer.java |  9 ++++--
 .../druid/segment/data/CompressionFactory.java     | 22 +++++++++-----
 .../druid/segment/data/GenericIndexedWriter.java   |  5 ++--
 .../data/IntermediateColumnarLongsSerializer.java  | 10 +++++--
 ...CompressedVSizeColumnarMultiIntsSerializer.java |  6 ++--
 .../GlobalDictionaryEncodedFieldColumnWriter.java  |  7 ++++-
 .../nested/ScalarDoubleColumnSerializer.java       |  3 +-
 .../nested/ScalarDoubleFieldColumnWriter.java      |  3 +-
 .../segment/nested/ScalarLongColumnSerializer.java |  3 +-
 .../nested/ScalarLongFieldColumnWriter.java        |  3 +-
 .../ScalarNestedCommonFormatColumnSerializer.java  |  3 +-
 .../segment/nested/VariantColumnSerializer.java    |  3 +-
 .../data/CompressedColumnarIntsSerializerTest.java | 14 +++++----
 .../segment/data/CompressedDoublesSerdeTest.java   | 12 ++++++--
 .../segment/data/CompressedFloatsSerdeTest.java    |  9 ++++--
 .../data/CompressedLongsAutoEncodingSerdeTest.java |  8 +++--
 .../segment/data/CompressedLongsSerdeTest.java     | 12 ++++++--
 .../CompressedVSizeColumnarIntsSerializerTest.java | 17 ++++++-----
 .../data/CompressedVariableSizeBlobColumnTest.java |  4 ++-
 ...ressedVSizeColumnarMultiIntsSerializerTest.java | 34 +++++++++++++---------
 39 files changed, 224 insertions(+), 117 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
index b26e1dc5892..f912f5e70b2 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
@@ -240,7 +240,8 @@ public class BaseColumnarLongsBenchmark
             "lz4-longs",
             ByteOrder.LITTLE_ENDIAN,
             CompressionFactory.LongEncodingStrategy.LONGS,
-            CompressionStrategy.LZ4
+            CompressionStrategy.LZ4,
+            writeOutMedium.getCloser()
         );
         break;
       case "lz4-auto":
@@ -250,7 +251,8 @@ public class BaseColumnarLongsBenchmark
             "lz4-auto",
             ByteOrder.LITTLE_ENDIAN,
             CompressionFactory.LongEncodingStrategy.AUTO,
-            CompressionStrategy.LZ4
+            CompressionStrategy.LZ4,
+            writeOutMedium.getCloser()
         );
         break;
       case "none-longs":
@@ -260,7 +262,8 @@ public class BaseColumnarLongsBenchmark
             "none-longs",
             ByteOrder.LITTLE_ENDIAN,
             CompressionFactory.LongEncodingStrategy.LONGS,
-            CompressionStrategy.NONE
+            CompressionStrategy.NONE,
+            writeOutMedium.getCloser()
         );
         break;
       case "none-auto":
@@ -270,7 +273,8 @@ public class BaseColumnarLongsBenchmark
             "none-auto",
             ByteOrder.LITTLE_ENDIAN,
             CompressionFactory.LongEncodingStrategy.AUTO,
-            CompressionStrategy.NONE
+            CompressionStrategy.NONE,
+            writeOutMedium.getCloser()
         );
         break;
       case "zstd-longs":
@@ -280,7 +284,8 @@ public class BaseColumnarLongsBenchmark
                 "zstd-longs",
                 ByteOrder.LITTLE_ENDIAN,
                 CompressionFactory.LongEncodingStrategy.LONGS,
-                CompressionStrategy.ZSTD
+                CompressionStrategy.ZSTD,
+                writeOutMedium.getCloser()
         );
         break;
       case "zstd-auto":
@@ -290,7 +295,8 @@ public class BaseColumnarLongsBenchmark
                 "zstd-auto",
                 ByteOrder.LITTLE_ENDIAN,
                 CompressionFactory.LongEncodingStrategy.AUTO,
-                CompressionStrategy.ZSTD
+                CompressionStrategy.ZSTD,
+                writeOutMedium.getCloser()
         );
         break;
       default:
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java
index 82709a6c4b3..9715db49e0e 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java
@@ -29,6 +29,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.generator.ColumnValueGenerator;
 import org.apache.druid.segment.generator.GeneratorColumnSchema;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -155,12 +156,14 @@ public class FloatCompressionBenchmarkFileGenerator
         compFile.delete();
         File dataFile = new File(dir, entry.getKey());
 
+        SegmentWriteOutMedium segmentWriteOutMedium = new 
OffHeapMemorySegmentWriteOutMedium();
         ColumnarFloatsSerializer writer = 
CompressionFactory.getFloatSerializer(
             "float-benchmark",
-            new OffHeapMemorySegmentWriteOutMedium(),
+            segmentWriteOutMedium,
             "float",
             ByteOrder.nativeOrder(),
-            compression
+            compression,
+            segmentWriteOutMedium.getCloser()
         );
         try (
             BufferedReader br = Files.newBufferedReader(dataFile.toPath(), 
StandardCharsets.UTF_8);
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java
index 55d5f6b82bb..b1786e82f45 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java
@@ -29,6 +29,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.generator.ColumnValueGenerator;
 import org.apache.druid.segment.generator.GeneratorColumnSchema;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -148,13 +149,15 @@ public class LongCompressionBenchmarkFileGenerator
           compFile.delete();
           File dataFile = new File(dir, entry.getKey());
 
+          SegmentWriteOutMedium segmentWriteOutMedium = new 
OffHeapMemorySegmentWriteOutMedium();
           ColumnarLongsSerializer writer = 
CompressionFactory.getLongSerializer(
               "long-benchmark",
-              new OffHeapMemorySegmentWriteOutMedium(),
+              segmentWriteOutMedium,
               "long",
               ByteOrder.nativeOrder(),
               encoding,
-              compression
+              compression,
+              segmentWriteOutMedium.getCloser()
           );
           try (
               BufferedReader br = Files.newBufferedReader(dataFile.toPath(), 
StandardCharsets.UTF_8);
diff --git 
a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
 
b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
index e6b6539fe38..ef899c2f455 100644
--- 
a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
+++ 
b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
@@ -49,7 +49,8 @@ public class CompressedBigDecimalLongColumnSerializer 
implements GenericColumnSe
    */
   public static CompressedBigDecimalLongColumnSerializer create(
       SegmentWriteOutMedium segmentWriteOutMedium,
-      String filenameBase)
+      String filenameBase
+  )
   {
     return new CompressedBigDecimalLongColumnSerializer(
         CompressedVSizeColumnarIntsSerializer.create(
@@ -57,13 +58,17 @@ public class CompressedBigDecimalLongColumnSerializer 
implements GenericColumnSe
             segmentWriteOutMedium,
             String.format(Locale.ROOT, "%s.scale", filenameBase),
             16,
-            CompressionStrategy.LZ4),
+            CompressionStrategy.LZ4,
+            segmentWriteOutMedium.getCloser()
+        ),
         V3CompressedVSizeColumnarMultiIntsSerializer.create(
             "dummy",
             segmentWriteOutMedium,
             String.format(Locale.ROOT, "%s.magnitude", filenameBase),
             Integer.MAX_VALUE,
-            CompressionStrategy.LZ4));
+            CompressionStrategy.LZ4
+        )
+    );
   }
 
   private final CompressedVSizeColumnarIntsSerializer scaleWriter;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
 
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
index 15f7f51120d..f1d9d7c5bb4 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
@@ -423,7 +423,8 @@ public abstract class DictionaryEncodedColumnMerger<T 
extends Comparable<T>> imp
             segmentWriteOutMedium,
             filenameBase,
             cardinality,
-            compressionStrategy
+            compressionStrategy,
+            segmentWriteOutMedium.getCloser()
         );
       } else {
         encodedValueSerializer = new 
VSizeColumnarIntsSerializer(segmentWriteOutMedium, cardinality);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java 
b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
index cc525cb7ba8..1b87fdb9ceb 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
@@ -72,7 +72,8 @@ public class DoubleColumnSerializer implements 
GenericColumnSerializer<Object>
         segmentWriteOutMedium,
         StringUtils.format("%s.double_column", filenameBase),
         byteOrder,
-        compression
+        compression,
+        segmentWriteOutMedium.getCloser()
     );
     writer.open();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
 
b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
index 02903eb4a27..a678c695e43 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
@@ -99,7 +99,8 @@ public class DoubleColumnSerializerV2 implements 
GenericColumnSerializer<Object>
         segmentWriteOutMedium,
         StringUtils.format("%s.double_column", filenameBase),
         byteOrder,
-        compression
+        compression,
+        segmentWriteOutMedium.getCloser()
     );
     writer.open();
     nullValueBitmapWriter = new ByteBufferWriter<>(
diff --git 
a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java 
b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
index b96d520e2e2..e1d23946d5b 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
@@ -72,7 +72,8 @@ public class FloatColumnSerializer implements 
GenericColumnSerializer<Object>
         segmentWriteOutMedium,
         StringUtils.format("%s.float_column", filenameBase),
         byteOrder,
-        compression
+        compression,
+        segmentWriteOutMedium.getCloser()
     );
     writer.open();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
 
b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
index b5371a0aac9..5930ae081ea 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
@@ -99,7 +99,8 @@ public class FloatColumnSerializerV2 implements 
GenericColumnSerializer<Object>
         segmentWriteOutMedium,
         StringUtils.format("%s.float_column", filenameBase),
         byteOrder,
-        compression
+        compression,
+        segmentWriteOutMedium.getCloser()
     );
     writer.open();
     nullValueBitmapWriter = new ByteBufferWriter<>(
diff --git 
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java 
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index 6b1ff6b3195..05c721e5c56 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -202,7 +202,6 @@ public class IndexMergerV9 implements IndexMerger
         mergers.add(
             handler.makeMerger(
                 indexSpec,
-
                 segmentWriteOutMedium,
                 dimFormats.get(i).toColumnCapabilities(),
                 progress,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java 
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
index 1f8d03bebc6..6a4bcacfbb8 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
@@ -80,7 +80,8 @@ public class LongColumnSerializer implements 
GenericColumnSerializer<Object>
         StringUtils.format("%s.long_column", filenameBase),
         byteOrder,
         encoding,
-        compression
+        compression,
+        segmentWriteOutMedium.getCloser()
     );
     writer.open();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java 
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
index 364a7af6825..226cd8bafb0 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
@@ -105,7 +105,8 @@ public class LongColumnSerializerV2 implements 
GenericColumnSerializer<Object>
         StringUtils.format("%s.long_column", filenameBase),
         byteOrder,
         encoding,
-        compression
+        compression,
+        segmentWriteOutMedium.getCloser()
     );
     writer.open();
     nullValueBitmapWriter = new ByteBufferWriter<>(
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
index 2b4612ecb3a..8c2dfb9c028 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
@@ -56,7 +56,8 @@ public class BlockLayoutColumnarDoublesSerializer implements 
ColumnarDoublesSeri
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
-      CompressionStrategy compression
+      CompressionStrategy compression,
+      Closer closer
   )
   {
     this.columnName = columnName;
@@ -64,11 +65,11 @@ public class BlockLayoutColumnarDoublesSerializer 
implements ColumnarDoublesSeri
         segmentWriteOutMedium,
         filenameBase,
         compression,
-        CompressedPools.BUFFER_SIZE
+        CompressedPools.BUFFER_SIZE,
+        closer
     );
     this.compression = compression;
     CompressionStrategy.Compressor compressor = compression.getCompressor();
-    Closer closer = segmentWriteOutMedium.getCloser();
     this.endBuffer = compressor.allocateInBuffer(CompressedPools.BUFFER_SIZE, 
closer).order(byteOrder);
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
index 94a3ef6319f..5640339a316 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
@@ -56,7 +56,8 @@ public class BlockLayoutColumnarFloatsSerializer implements 
ColumnarFloatsSerial
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
-      CompressionStrategy compression
+      CompressionStrategy compression,
+      Closer closer
   )
   {
     this.columnName = columnName;
@@ -64,11 +65,11 @@ public class BlockLayoutColumnarFloatsSerializer implements 
ColumnarFloatsSerial
         segmentWriteOutMedium,
         filenameBase,
         compression,
-        CompressedPools.BUFFER_SIZE
+        CompressedPools.BUFFER_SIZE,
+        closer
     );
     this.compression = compression;
     CompressionStrategy.Compressor compressor = compression.getCompressor();
-    Closer closer = segmentWriteOutMedium.getCloser();
     this.endBuffer = compressor.allocateInBuffer(CompressedPools.BUFFER_SIZE, 
closer).order(byteOrder);
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
index ff47a34cca4..37d468d62e4 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
@@ -19,13 +19,13 @@
 
 package org.apache.druid.segment.data;
 
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.segment.CompressedPools;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -59,17 +59,24 @@ public class BlockLayoutColumnarLongsSerializer implements 
ColumnarLongsSerializ
       String filenameBase,
       ByteOrder byteOrder,
       CompressionFactory.LongEncodingWriter writer,
-      CompressionStrategy compression
+      CompressionStrategy compression,
+      Closer closer
   )
   {
     this.columnName = columnName;
     this.sizePer = writer.getBlockSize(CompressedPools.BUFFER_SIZE);
     int bufferSize = writer.getNumBytes(sizePer);
-    this.flattener = 
GenericIndexedWriter.ofCompressedByteBuffers(segmentWriteOutMedium, 
filenameBase, compression, bufferSize);
+    this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(
+        segmentWriteOutMedium,
+        filenameBase,
+        compression,
+        bufferSize,
+        closer
+    );
     this.writer = writer;
     this.compression = compression;
     CompressionStrategy.Compressor compressor = compression.getCompressor();
-    endBuffer = compressor.allocateInBuffer(writer.getNumBytes(sizePer), 
segmentWriteOutMedium.getCloser()).order(byteOrder);
+    endBuffer = compressor.allocateInBuffer(writer.getNumBytes(sizePer), 
closer).order(byteOrder);
     writer.setBuffer(endBuffer);
     numInsertedForNextFlush = sizePer;
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java
index 07208160af8..d5beedcae51 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.data;
 
 import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.segment.CompressedPools;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
@@ -60,18 +61,16 @@ public class CompressedBlockSerializer implements Serializer
 
   public CompressedBlockSerializer(
       SegmentWriteOutMedium segmentWriteOutMedium,
-
       CompressionStrategy compression,
-      int blockSize
+      int blockSize,
+      Closer closer
   )
   {
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.compression = compression;
     this.compressor = compression.getCompressor();
-    this.uncompressedDataBuffer = compressor.allocateInBuffer(blockSize, 
segmentWriteOutMedium.getCloser())
-                                            .order(ByteOrder.nativeOrder());
-    this.compressedDataBuffer = compressor.allocateOutBuffer(blockSize, 
segmentWriteOutMedium.getCloser())
-                                          .order(ByteOrder.nativeOrder());
+    this.uncompressedDataBuffer = compressor.allocateInBuffer(blockSize, 
closer).order(ByteOrder.nativeOrder());
+    this.compressedDataBuffer = compressor.allocateOutBuffer(blockSize, 
closer).order(ByteOrder.nativeOrder());
   }
 
   public void open() throws IOException
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
index a0442d95adf..cc724ba1cc7 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
@@ -58,12 +58,12 @@ public class CompressedColumnarIntsSerializer extends 
SingleValueColumnarIntsSer
       final String filenameBase,
       final int chunkFactor,
       final ByteOrder byteOrder,
-      final CompressionStrategy compression
+      final CompressionStrategy compression,
+      final Closer closer
   )
   {
     this(
         columnName,
-        segmentWriteOutMedium,
         chunkFactor,
         byteOrder,
         compression,
@@ -71,18 +71,20 @@ public class CompressedColumnarIntsSerializer extends 
SingleValueColumnarIntsSer
             segmentWriteOutMedium,
             filenameBase,
             compression,
-            chunkFactor * Integer.BYTES
-        )
+            chunkFactor * Integer.BYTES,
+            closer
+        ),
+        closer
     );
   }
 
   CompressedColumnarIntsSerializer(
       final String columnName,
-      final SegmentWriteOutMedium segmentWriteOutMedium,
       final int chunkFactor,
       final ByteOrder byteOrder,
       final CompressionStrategy compression,
-      final GenericIndexedWriter<ByteBuffer> flattener
+      final GenericIndexedWriter<ByteBuffer> flattener,
+      final Closer closer
   )
   {
     this.columnName = columnName;
@@ -90,7 +92,6 @@ public class CompressedColumnarIntsSerializer extends 
SingleValueColumnarIntsSer
     this.compression = compression;
     this.flattener = flattener;
     CompressionStrategy.Compressor compressor = compression.getCompressor();
-    Closer closer = segmentWriteOutMedium.getCloser();
     this.endBuffer = compressor.allocateInBuffer(chunkFactor * Integer.BYTES, 
closer).order(byteOrder);
     this.numInserted = 0;
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java
index cbf9211ec17..cfc56b84c21 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.data;
 
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.segment.CompressedPools;
 import org.apache.druid.segment.serde.Serializer;
@@ -34,12 +35,17 @@ public class CompressedLongsSerializer implements Serializer
   private final CompressedBlockSerializer blockSerializer;
   private final ByteBuffer longValueConverter = 
ByteBuffer.allocate(Long.BYTES).order(ByteOrder.nativeOrder());
 
-  public CompressedLongsSerializer(SegmentWriteOutMedium 
segmentWriteOutMedium, CompressionStrategy compression)
+  public CompressedLongsSerializer(
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      CompressionStrategy compression,
+      Closer closer
+  )
   {
     this.blockSerializer = new CompressedBlockSerializer(
         segmentWriteOutMedium,
         compression,
-        CompressedPools.BUFFER_SIZE
+        CompressedPools.BUFFER_SIZE,
+        closer
     );
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
index 6060be2cc3c..84f4799e6d2 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
@@ -20,13 +20,13 @@
 package org.apache.druid.segment.data;
 
 import org.apache.druid.common.utils.ByteUtils;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -51,7 +51,8 @@ public class CompressedVSizeColumnarIntsSerializer extends 
SingleValueColumnarIn
       final SegmentWriteOutMedium segmentWriteOutMedium,
       final String filenameBase,
       final int maxValue,
-      final CompressionStrategy compression
+      final CompressionStrategy compression,
+      final Closer closer
   )
   {
     return new CompressedVSizeColumnarIntsSerializer(
@@ -61,7 +62,8 @@ public class CompressedVSizeColumnarIntsSerializer extends 
SingleValueColumnarIn
         maxValue,
         CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue),
         IndexIO.BYTE_ORDER,
-        compression
+        compression,
+        closer
     );
   }
 
@@ -84,12 +86,12 @@ public class CompressedVSizeColumnarIntsSerializer extends 
SingleValueColumnarIn
       final int maxValue,
       final int chunkFactor,
       final ByteOrder byteOrder,
-      final CompressionStrategy compression
+      final CompressionStrategy compression,
+      final Closer closer
   )
   {
     this(
         columnName,
-        segmentWriteOutMedium,
         maxValue,
         chunkFactor,
         byteOrder,
@@ -98,19 +100,21 @@ public class CompressedVSizeColumnarIntsSerializer extends 
SingleValueColumnarIn
             segmentWriteOutMedium,
             filenameBase,
             compression,
-            sizePer(maxValue, chunkFactor)
-        )
+            sizePer(maxValue, chunkFactor),
+            closer
+        ),
+        closer
     );
   }
 
   CompressedVSizeColumnarIntsSerializer(
       final String columnName,
-      final SegmentWriteOutMedium segmentWriteOutMedium,
       final int maxValue,
       final int chunkFactor,
       final ByteOrder byteOrder,
       final CompressionStrategy compression,
-      final GenericIndexedWriter<ByteBuffer> flattener
+      final GenericIndexedWriter<ByteBuffer> flattener,
+      final Closer closer
   )
   {
     this.columnName = columnName;
@@ -122,7 +126,7 @@ public class CompressedVSizeColumnarIntsSerializer extends 
SingleValueColumnarIn
     this.flattener = flattener;
     this.intBuffer = ByteBuffer.allocate(Integer.BYTES).order(byteOrder);
     CompressionStrategy.Compressor compressor = compression.getCompressor();
-    this.endBuffer = compressor.allocateInBuffer(chunkBytes, 
segmentWriteOutMedium.getCloser()).order(byteOrder);
+    this.endBuffer = compressor.allocateInBuffer(chunkBytes, 
closer).order(byteOrder);
     this.numInserted = 0;
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java
index 19aa9e445a1..6693daa4326 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java
@@ -66,13 +66,18 @@ public class CompressedVariableSizedBlobColumnSerializer 
implements Serializer
   {
     numValues = 0;
     currentOffset = 0;
-    offsetsSerializer = new CompressedLongsSerializer(segmentWriteOutMedium, 
compression);
+    offsetsSerializer = new CompressedLongsSerializer(
+        segmentWriteOutMedium,
+        compression,
+        segmentWriteOutMedium.getCloser()
+    );
     offsetsSerializer.open();
 
     valuesSerializer = new CompressedBlockSerializer(
         segmentWriteOutMedium,
         compression,
-        CompressedPools.BUFFER_SIZE
+        CompressedPools.BUFFER_SIZE,
+        segmentWriteOutMedium.getCloser()
     );
     valuesSerializer.open();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
index 10943316b70..dde6a440d9e 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.base.Supplier;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 import org.apache.druid.segment.writeout.WriteOutBytes;
@@ -324,7 +325,8 @@ public class CompressionFactory
       String filenameBase,
       ByteOrder order,
       LongEncodingStrategy encodingStrategy,
-      CompressionStrategy compressionStrategy
+      CompressionStrategy compressionStrategy,
+      Closer closer
   )
   {
     if (encodingStrategy == LongEncodingStrategy.AUTO) {
@@ -333,7 +335,8 @@ public class CompressionFactory
           segmentWriteOutMedium,
           filenameBase,
           order,
-          compressionStrategy
+          compressionStrategy,
+          closer
       );
     } else if (encodingStrategy == LongEncodingStrategy.LONGS) {
       if (compressionStrategy == CompressionStrategy.NONE) {
@@ -349,7 +352,8 @@ public class CompressionFactory
             filenameBase,
             order,
             new LongsLongEncodingWriter(order),
-            compressionStrategy
+            compressionStrategy,
+            closer
         );
       }
     } else {
@@ -379,7 +383,8 @@ public class CompressionFactory
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder order,
-      CompressionStrategy compressionStrategy
+      CompressionStrategy compressionStrategy,
+      Closer closer
   )
   {
     if (compressionStrategy == CompressionStrategy.NONE) {
@@ -390,7 +395,8 @@ public class CompressionFactory
           segmentWriteOutMedium,
           filenameBase,
           order,
-          compressionStrategy
+          compressionStrategy,
+          closer
       );
     }
   }
@@ -417,7 +423,8 @@ public class CompressionFactory
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
-      CompressionStrategy compression
+      CompressionStrategy compression,
+      Closer closer
   )
   {
     if (compression == CompressionStrategy.NONE) {
@@ -428,7 +435,8 @@ public class CompressionFactory
           segmentWriteOutMedium,
           filenameBase,
           byteOrder,
-          compression
+          compression,
+          closer
       );
     }
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
 
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
index a69d645be39..8b38125322b 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
@@ -77,13 +77,14 @@ public class GenericIndexedWriter<T> implements 
DictionaryWriter<T>
       final SegmentWriteOutMedium segmentWriteOutMedium,
       final String filenameBase,
       final CompressionStrategy compressionStrategy,
-      final int bufferSize
+      final int bufferSize,
+      final Closer closer
   )
   {
     GenericIndexedWriter<ByteBuffer> writer = new GenericIndexedWriter<>(
         segmentWriteOutMedium,
         filenameBase,
-        compressedByteBuffersWriteObjectStrategy(compressionStrategy, 
bufferSize, segmentWriteOutMedium.getCloser())
+        compressedByteBuffersWriteObjectStrategy(compressionStrategy, 
bufferSize, closer)
     );
     writer.objectsSorted = false;
     return writer;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
index c0f9355350a..7403f8dfd20 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
@@ -24,11 +24,11 @@ import it.unimi.dsi.fastutil.longs.Long2IntMap;
 import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.longs.LongList;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.nio.ByteOrder;
 import java.nio.channels.WritableByteChannel;
@@ -45,6 +45,7 @@ public class IntermediateColumnarLongsSerializer implements 
ColumnarLongsSeriali
   private final String filenameBase;
   private final ByteOrder order;
   private final CompressionStrategy compression;
+  private final Closer closer;
 
   private int numInserted = 0;
 
@@ -64,7 +65,8 @@ public class IntermediateColumnarLongsSerializer implements 
ColumnarLongsSeriali
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder order,
-      CompressionStrategy compression
+      CompressionStrategy compression,
+      Closer closer
   )
   {
     this.columnName = columnName;
@@ -72,6 +74,7 @@ public class IntermediateColumnarLongsSerializer implements 
ColumnarLongsSeriali
     this.filenameBase = filenameBase;
     this.order = order;
     this.compression = compression;
+    this.closer = closer;
   }
 
   @Override
@@ -141,7 +144,8 @@ public class IntermediateColumnarLongsSerializer implements 
ColumnarLongsSeriali
           filenameBase,
           order,
           writer,
-          compression
+          compression,
+          closer
       );
     }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
index f6690293012..0fac36399d1 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
@@ -51,7 +51,8 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer 
extends ColumnarMultiI
             filenameBase,
             CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER,
             IndexIO.BYTE_ORDER,
-            compression
+            compression,
+            segmentWriteOutMedium.getCloser()
         ),
         new CompressedVSizeColumnarIntsSerializer(
             columnName,
@@ -60,7 +61,8 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer 
extends ColumnarMultiI
             maxValue,
             
CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue),
             IndexIO.BYTE_ORDER,
-            compression
+            compression,
+            segmentWriteOutMedium.getCloser()
         )
     );
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
index efa25865fa4..aa6a71ae754 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
@@ -29,6 +29,7 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.bitmap.MutableBitmap;
 import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -81,6 +82,8 @@ public abstract class 
GlobalDictionaryEncodedFieldColumnWriter<T>
 
   protected final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new 
Int2ObjectRBTreeMap<>();
 
+  protected final Closer fieldResourceCloser = Closer.create();
+
   protected FixedIndexedIntWriter intermediateValueWriter;
   // maybe someday we allow no bitmap indexes or multi-value columns
   protected int flags = DictionaryEncodedColumnPartSerde.NO_FLAGS;
@@ -300,6 +303,7 @@ public abstract class 
GlobalDictionaryEncodedFieldColumnWriter<T>
     }
     finally {
       tmpWriteoutMedium.close();
+      fieldResourceCloser.close();
     }
   }
 
@@ -312,7 +316,8 @@ public abstract class 
GlobalDictionaryEncodedFieldColumnWriter<T>
           medium,
           columnName,
           maxId,
-          indexSpec.getDimensionCompression()
+          indexSpec.getDimensionCompression(),
+          fieldResourceCloser
       );
     } else {
       encodedValueSerializer = new VSizeColumnarIntsSerializer(medium, maxId);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
index e077282f98f..874b8b309a4 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
@@ -95,7 +95,8 @@ public class ScalarDoubleColumnSerializer extends 
ScalarNestedCommonFormatColumn
         segmentWriteOutMedium,
         StringUtils.format("%s.double_column", name),
         ByteOrder.nativeOrder(),
-        indexSpec.getDimensionCompression()
+        indexSpec.getDimensionCompression(),
+        segmentWriteOutMedium.getCloser()
     );
     doublesSerializer.open();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
index 144e848d831..8ccd528715b 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
@@ -67,7 +67,8 @@ public final class ScalarDoubleFieldColumnWriter extends 
GlobalDictionaryEncoded
         segmentWriteOutMedium,
         StringUtils.format("%s.double_column", fieldName),
         ByteOrder.nativeOrder(),
-        indexSpec.getDimensionCompression()
+        indexSpec.getDimensionCompression(),
+        fieldResourceCloser
     );
     doublesSerializer.open();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
index bfb966365e2..46b70d9907c 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
@@ -97,7 +97,8 @@ public class ScalarLongColumnSerializer extends 
ScalarNestedCommonFormatColumnSe
         StringUtils.format("%s.long_column", name),
         ByteOrder.nativeOrder(),
         indexSpec.getLongEncoding(),
-        indexSpec.getDimensionCompression()
+        indexSpec.getDimensionCompression(),
+        segmentWriteOutMedium.getCloser()
     );
     longsSerializer.open();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
index 4ca317edb01..66b5eca18d9 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
@@ -68,7 +68,8 @@ public final class ScalarLongFieldColumnWriter extends 
GlobalDictionaryEncodedFi
         StringUtils.format("%s.long_column", fieldName),
         ByteOrder.nativeOrder(),
         indexSpec.getLongEncoding(),
-        indexSpec.getDimensionCompression()
+        indexSpec.getDimensionCompression(),
+        fieldResourceCloser
     );
     longsSerializer.open();
   }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
index 2caa19ad8d6..771cdb7fb5b 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
@@ -184,7 +184,8 @@ public abstract class 
ScalarNestedCommonFormatColumnSerializer<T> extends Nested
         segmentWriteOutMedium,
         filenameBase,
         dictionaryWriter.getCardinality(),
-        compressionToUse
+        compressionToUse,
+        segmentWriteOutMedium.getCloser()
     );
     encodedValueSerializer.open();
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
index 58464b2c9e6..abd88b57df0 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
@@ -342,7 +342,8 @@ public class VariantColumnSerializer extends 
NestedCommonFormatColumnSerializer
         segmentWriteOutMedium,
         filenameBase,
         cardinality,
-        compressionToUse
+        compressionToUse,
+        segmentWriteOutMedium.getCloser()
     );
     encodedValueSerializer.open();
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
index 52dca67257c..6e1c4229084 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
@@ -167,7 +167,8 @@ public class CompressedColumnarIntsSerializerTest
           "test",
           CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER,
           byteOrder,
-          compressionStrategy
+          compressionStrategy,
+          segmentWriteOutMedium.getCloser()
       );
       serializer.open();
 
@@ -196,7 +197,8 @@ public class CompressedColumnarIntsSerializerTest
         "test",
         chunkFactor,
         byteOrder,
-        compressionStrategy
+        compressionStrategy,
+        segmentWriteOutMedium.getCloser()
     );
     CompressedColumnarIntsSupplier supplierFromList = 
CompressedColumnarIntsSupplier.fromList(
         IntArrayList.wrap(vals),
@@ -227,6 +229,7 @@ public class CompressedColumnarIntsSerializerTest
       Assert.assertEquals(vals[i], columnarInts.get(i));
     }
     CloseableUtils.closeAndWrapExceptions(columnarInts);
+    CloseableUtils.closeAndWrapExceptions(segmentWriteOutMedium);
   }
 
   private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception
@@ -236,7 +239,6 @@ public class CompressedColumnarIntsSerializerTest
 
     CompressedColumnarIntsSerializer writer = new 
CompressedColumnarIntsSerializer(
         "test",
-        segmentWriteOutMedium,
         chunkFactor,
         byteOrder,
         compressionStrategy,
@@ -244,8 +246,10 @@ public class CompressedColumnarIntsSerializerTest
             segmentWriteOutMedium,
             "test",
             compressionStrategy,
-            Long.BYTES * 10000
-        )
+            Long.BYTES * 10000,
+            segmentWriteOutMedium.getCloser()
+        ),
+        segmentWriteOutMedium.getCloser()
     );
 
     writer.open();
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
index 2f94a772b4d..be472a71ed9 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
@@ -147,7 +147,8 @@ public class CompressedDoublesSerdeTest
           segmentWriteOutMedium,
           "test",
           order,
-          compressionStrategy
+          compressionStrategy,
+          segmentWriteOutMedium.getCloser()
       );
       serializer.open();
 
@@ -160,12 +161,14 @@ public class CompressedDoublesSerdeTest
 
   public void testWithValues(double[] values) throws Exception
   {
+    final SegmentWriteOutMedium segmentWriteOutMedium = new 
OffHeapMemorySegmentWriteOutMedium();
     ColumnarDoublesSerializer serializer = 
CompressionFactory.getDoubleSerializer(
         "test",
-        new OffHeapMemorySegmentWriteOutMedium(),
+        segmentWriteOutMedium,
         "test",
         order,
-        compressionStrategy
+        compressionStrategy,
+        segmentWriteOutMedium.getCloser()
     );
     serializer.open();
 
@@ -190,6 +193,9 @@ public class CompressedDoublesSerdeTest
       }
       testConcurrentThreadReads(supplier, doubles, values);
     }
+    finally {
+      segmentWriteOutMedium.close();
+    }
   }
 
   private void tryFill(ColumnarDoubles indexed, double[] vals, final int 
startIndex, final int size)
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
index 14b935be2f3..02e320f46a4 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
@@ -154,7 +154,8 @@ public class CompressedFloatsSerdeTest
           segmentWriteOutMedium,
           "test",
           order,
-          compressionStrategy
+          compressionStrategy,
+          segmentWriteOutMedium.getCloser()
       );
       serializer.open();
 
@@ -167,12 +168,14 @@ public class CompressedFloatsSerdeTest
 
   public void testWithValues(float[] values) throws Exception
   {
+    SegmentWriteOutMedium segmentWriteOutMedium = new 
OffHeapMemorySegmentWriteOutMedium();
     ColumnarFloatsSerializer serializer = 
CompressionFactory.getFloatSerializer(
         "test",
-        new OffHeapMemorySegmentWriteOutMedium(),
+        segmentWriteOutMedium,
         "test",
         order,
-        compressionStrategy
+        compressionStrategy,
+        segmentWriteOutMedium.getCloser()
     );
     serializer.open();
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
index 87daaddbe42..0fd5bbf6f89 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.data;
 
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -95,13 +96,15 @@ public class CompressedLongsAutoEncodingSerdeTest
 
   public void testValues(long[] values) throws Exception
   {
+    SegmentWriteOutMedium segmentWriteOutMedium = new 
OffHeapMemorySegmentWriteOutMedium();
     ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
         "test",
-        new OffHeapMemorySegmentWriteOutMedium(),
+        segmentWriteOutMedium,
         "test",
         order,
         encodingStrategy,
-        compressionStrategy
+        compressionStrategy,
+        segmentWriteOutMedium.getCloser()
     );
     serializer.open();
 
@@ -119,6 +122,7 @@ public class CompressedLongsAutoEncodingSerdeTest
 
     assertIndexMatchesVals(longs, values);
     longs.close();
+    segmentWriteOutMedium.close();
   }
 
   private void assertIndexMatchesVals(ColumnarLongs indexed, long[] vals)
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
index b643ee43d83..ba35a03bff5 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
@@ -154,7 +154,8 @@ public class CompressedLongsSerdeTest
           "test",
           order,
           encodingStrategy,
-          compressionStrategy
+          compressionStrategy,
+          segmentWriteOutMedium.getCloser()
       );
       serializer.open();
 
@@ -173,13 +174,15 @@ public class CompressedLongsSerdeTest
 
   public void testValues(long[] values) throws Exception
   {
+    SegmentWriteOutMedium segmentWriteOutMedium = new 
OffHeapMemorySegmentWriteOutMedium();
     ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
         "test",
-        new OffHeapMemorySegmentWriteOutMedium(),
+        segmentWriteOutMedium,
         "test",
         order,
         encodingStrategy,
-        compressionStrategy
+        compressionStrategy,
+        segmentWriteOutMedium.getCloser()
     );
     serializer.open();
 
@@ -206,6 +209,9 @@ public class CompressedLongsSerdeTest
       testSupplierSerde(supplier, values);
       testConcurrentThreadReads(supplier, longs, values);
     }
+    finally {
+      segmentWriteOutMedium.close();
+    }
   }
 
   private void tryFill(ColumnarLongs indexed, long[] vals, final int 
startIndex, final int size)
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
index bb5868f6df2..c06e11c90d9 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
@@ -124,7 +124,8 @@ public class CompressedVSizeColumnarIntsSerializerTest
         vals.length > 0 ? Ints.max(vals) : 0,
         chunkSize,
         byteOrder,
-        compressionStrategy
+        compressionStrategy,
+        segmentWriteOutMedium.getCloser()
     );
     CompressedVSizeColumnarIntsSupplier supplierFromList = 
CompressedVSizeColumnarIntsSupplier.fromList(
         IntArrayList.wrap(vals),
@@ -197,16 +198,17 @@ public class CompressedVSizeColumnarIntsSerializerTest
           segmentWriteOutMedium,
           "test",
           compressionStrategy,
-          Long.BYTES * 10000
+          Long.BYTES * 10000,
+          segmentWriteOutMedium.getCloser()
       );
       CompressedVSizeColumnarIntsSerializer serializer = new 
CompressedVSizeColumnarIntsSerializer(
           "test",
-          segmentWriteOutMedium,
           maxValue,
           maxChunkSize,
           byteOrder,
           compressionStrategy,
-          genericIndexed
+          genericIndexed,
+          segmentWriteOutMedium.getCloser()
       );
       serializer.open();
 
@@ -233,16 +235,17 @@ public class CompressedVSizeColumnarIntsSerializerTest
         segmentWriteOutMedium,
         "test",
         compressionStrategy,
-        Long.BYTES * 10000
+        Long.BYTES * 10000,
+        segmentWriteOutMedium.getCloser()
     );
     CompressedVSizeColumnarIntsSerializer writer = new 
CompressedVSizeColumnarIntsSerializer(
         columnName,
-        segmentWriteOutMedium,
         vals.length > 0 ? Ints.max(vals) : 0,
         chunkSize,
         byteOrder,
         compressionStrategy,
-        genericIndexed
+        genericIndexed,
+        segmentWriteOutMedium.getCloser()
     );
     writer.open();
     for (int val : vals) {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java
index 88685c7caaf..11609ffdc97 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java
@@ -186,7 +186,8 @@ public class CompressedVariableSizeBlobColumnTest
     final CompressionStrategy compressionStrategy = CompressionStrategy.LZ4;
     CompressedLongsSerializer serializer = new CompressedLongsSerializer(
         writeOutMedium,
-        compressionStrategy
+        compressionStrategy,
+        writeOutMedium.getCloser()
     );
     serializer.open();
 
@@ -204,6 +205,7 @@ public class CompressedVariableSizeBlobColumnTest
     serializer.writeTo(writer, smoosher);
     writer.close();
     smoosher.close();
+    writeOutMedium.close();
     SmooshedFileMapper fileMapper = SmooshedFileMapper.load(tmpFile);
 
     ByteBuffer base = fileMapper.mapFile(fileNameBase);
diff --git 
a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
 
b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
index 6877416e764..29ba49913c4 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
@@ -206,7 +206,8 @@ public class 
V3CompressedVSizeColumnarMultiIntsSerializerTest
           "offset",
           offsetChunkFactor,
           byteOrder,
-          compressionStrategy
+          compressionStrategy,
+          segmentWriteOutMedium.getCloser()
       );
       CompressedVSizeColumnarIntsSerializer valueWriter = new 
CompressedVSizeColumnarIntsSerializer(
           TEST_COLUMN_NAME,
@@ -215,7 +216,8 @@ public class 
V3CompressedVSizeColumnarMultiIntsSerializerTest
           maxValue,
           valueChunkFactor,
           byteOrder,
-          compressionStrategy
+          compressionStrategy,
+          segmentWriteOutMedium.getCloser()
       );
       V3CompressedVSizeColumnarMultiIntsSerializer writer =
           new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, 
offsetWriter, valueWriter);
@@ -271,7 +273,6 @@ public class 
V3CompressedVSizeColumnarMultiIntsSerializerTest
     try (SegmentWriteOutMedium segmentWriteOutMedium = new 
OffHeapMemorySegmentWriteOutMedium()) {
       CompressedColumnarIntsSerializer offsetWriter = new 
CompressedColumnarIntsSerializer(
           TEST_COLUMN_NAME,
-          segmentWriteOutMedium,
           offsetChunkFactor,
           byteOrder,
           compressionStrategy,
@@ -279,24 +280,27 @@ public class 
V3CompressedVSizeColumnarMultiIntsSerializerTest
               segmentWriteOutMedium,
               "offset",
               compressionStrategy,
-              Long.BYTES * 250000
-          )
+              Long.BYTES * 250000,
+              segmentWriteOutMedium.getCloser()
+          ),
+          segmentWriteOutMedium.getCloser()
       );
 
       GenericIndexedWriter genericIndexed = 
GenericIndexedWriter.ofCompressedByteBuffers(
           segmentWriteOutMedium,
           "value",
           compressionStrategy,
-          Long.BYTES * 250000
+          Long.BYTES * 250000,
+          segmentWriteOutMedium.getCloser()
       );
       CompressedVSizeColumnarIntsSerializer valueWriter = new 
CompressedVSizeColumnarIntsSerializer(
           TEST_COLUMN_NAME,
-          segmentWriteOutMedium,
           maxValue,
           valueChunkFactor,
           byteOrder,
           compressionStrategy,
-          genericIndexed
+          genericIndexed,
+          segmentWriteOutMedium.getCloser()
       );
       V3CompressedVSizeColumnarMultiIntsSerializer writer =
           new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, 
offsetWriter, valueWriter);
@@ -347,7 +351,6 @@ public class 
V3CompressedVSizeColumnarMultiIntsSerializerTest
     ) {
       CompressedColumnarIntsSerializer offsetWriter = new 
CompressedColumnarIntsSerializer(
           TEST_COLUMN_NAME,
-          segmentWriteOutMedium,
           offsetChunkFactor,
           byteOrder,
           compressionStrategy,
@@ -355,24 +358,27 @@ public class 
V3CompressedVSizeColumnarMultiIntsSerializerTest
               segmentWriteOutMedium,
               "offset",
               compressionStrategy,
-              Long.BYTES * 250000
-          )
+              Long.BYTES * 250000,
+              segmentWriteOutMedium.getCloser()
+          ),
+          segmentWriteOutMedium.getCloser()
       );
 
       GenericIndexedWriter genericIndexed = 
GenericIndexedWriter.ofCompressedByteBuffers(
           segmentWriteOutMedium,
           "value",
           compressionStrategy,
-          Long.BYTES * 250000
+          Long.BYTES * 250000,
+          segmentWriteOutMedium.getCloser()
       );
       CompressedVSizeColumnarIntsSerializer valueWriter = new 
CompressedVSizeColumnarIntsSerializer(
           TEST_COLUMN_NAME,
-          segmentWriteOutMedium,
           maxValue,
           valueChunkFactor,
           byteOrder,
           compressionStrategy,
-          genericIndexed
+          genericIndexed,
+          segmentWriteOutMedium.getCloser()
       );
       V3CompressedVSizeColumnarMultiIntsSerializer writer =
           new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, 
offsetWriter, valueWriter);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to