This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 313da988794 decouple column serializer compression closers from
SegmentWriteoutMedium to optionally allow serializers to release direct memory
allocated for compression earlier than when segment is completed (#16076)
313da988794 is described below
commit 313da988794d81b94ebbbe5bc3ba8b5e3c4fd6c7
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Mar 11 12:28:04 2024 -0700
decouple column serializer compression closers from SegmentWriteoutMedium
to optionally allow serializers to release direct memory allocated for
compression earlier than when segment is completed (#16076)
---
.../compression/BaseColumnarLongsBenchmark.java | 18 ++++++++----
.../FloatCompressionBenchmarkFileGenerator.java | 7 +++--
.../LongCompressionBenchmarkFileGenerator.java | 7 +++--
.../CompressedBigDecimalLongColumnSerializer.java | 11 +++++--
.../segment/DictionaryEncodedColumnMerger.java | 3 +-
.../druid/segment/DoubleColumnSerializer.java | 3 +-
.../druid/segment/DoubleColumnSerializerV2.java | 3 +-
.../druid/segment/FloatColumnSerializer.java | 3 +-
.../druid/segment/FloatColumnSerializerV2.java | 3 +-
.../org/apache/druid/segment/IndexMergerV9.java | 1 -
.../apache/druid/segment/LongColumnSerializer.java | 3 +-
.../druid/segment/LongColumnSerializerV2.java | 3 +-
.../data/BlockLayoutColumnarDoublesSerializer.java | 7 +++--
.../data/BlockLayoutColumnarFloatsSerializer.java | 7 +++--
.../data/BlockLayoutColumnarLongsSerializer.java | 15 +++++++---
.../segment/data/CompressedBlockSerializer.java | 11 ++++---
.../data/CompressedColumnarIntsSerializer.java | 15 +++++-----
.../segment/data/CompressedLongsSerializer.java | 10 +++++--
.../CompressedVSizeColumnarIntsSerializer.java | 24 ++++++++-------
...ompressedVariableSizedBlobColumnSerializer.java | 9 ++++--
.../druid/segment/data/CompressionFactory.java | 22 +++++++++-----
.../druid/segment/data/GenericIndexedWriter.java | 5 ++--
.../data/IntermediateColumnarLongsSerializer.java | 10 +++++--
...CompressedVSizeColumnarMultiIntsSerializer.java | 6 ++--
.../GlobalDictionaryEncodedFieldColumnWriter.java | 7 ++++-
.../nested/ScalarDoubleColumnSerializer.java | 3 +-
.../nested/ScalarDoubleFieldColumnWriter.java | 3 +-
.../segment/nested/ScalarLongColumnSerializer.java | 3 +-
.../nested/ScalarLongFieldColumnWriter.java | 3 +-
.../ScalarNestedCommonFormatColumnSerializer.java | 3 +-
.../segment/nested/VariantColumnSerializer.java | 3 +-
.../data/CompressedColumnarIntsSerializerTest.java | 14 +++++----
.../segment/data/CompressedDoublesSerdeTest.java | 12 ++++++--
.../segment/data/CompressedFloatsSerdeTest.java | 9 ++++--
.../data/CompressedLongsAutoEncodingSerdeTest.java | 8 +++--
.../segment/data/CompressedLongsSerdeTest.java | 12 ++++++--
.../CompressedVSizeColumnarIntsSerializerTest.java | 17 ++++++-----
.../data/CompressedVariableSizeBlobColumnTest.java | 4 ++-
...ressedVSizeColumnarMultiIntsSerializerTest.java | 34 +++++++++++++---------
39 files changed, 224 insertions(+), 117 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
index b26e1dc5892..f912f5e70b2 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
@@ -240,7 +240,8 @@ public class BaseColumnarLongsBenchmark
"lz4-longs",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.LONGS,
- CompressionStrategy.LZ4
+ CompressionStrategy.LZ4,
+ writeOutMedium.getCloser()
);
break;
case "lz4-auto":
@@ -250,7 +251,8 @@ public class BaseColumnarLongsBenchmark
"lz4-auto",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.AUTO,
- CompressionStrategy.LZ4
+ CompressionStrategy.LZ4,
+ writeOutMedium.getCloser()
);
break;
case "none-longs":
@@ -260,7 +262,8 @@ public class BaseColumnarLongsBenchmark
"none-longs",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.LONGS,
- CompressionStrategy.NONE
+ CompressionStrategy.NONE,
+ writeOutMedium.getCloser()
);
break;
case "none-auto":
@@ -270,7 +273,8 @@ public class BaseColumnarLongsBenchmark
"none-auto",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.AUTO,
- CompressionStrategy.NONE
+ CompressionStrategy.NONE,
+ writeOutMedium.getCloser()
);
break;
case "zstd-longs":
@@ -280,7 +284,8 @@ public class BaseColumnarLongsBenchmark
"zstd-longs",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.LONGS,
- CompressionStrategy.ZSTD
+ CompressionStrategy.ZSTD,
+ writeOutMedium.getCloser()
);
break;
case "zstd-auto":
@@ -290,7 +295,8 @@ public class BaseColumnarLongsBenchmark
"zstd-auto",
ByteOrder.LITTLE_ENDIAN,
CompressionFactory.LongEncodingStrategy.AUTO,
- CompressionStrategy.ZSTD
+ CompressionStrategy.ZSTD,
+ writeOutMedium.getCloser()
);
break;
default:
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java
index 82709a6c4b3..9715db49e0e 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/FloatCompressionBenchmarkFileGenerator.java
@@ -29,6 +29,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.generator.ColumnValueGenerator;
import org.apache.druid.segment.generator.GeneratorColumnSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.io.BufferedReader;
import java.io.File;
@@ -155,12 +156,14 @@ public class FloatCompressionBenchmarkFileGenerator
compFile.delete();
File dataFile = new File(dir, entry.getKey());
+ SegmentWriteOutMedium segmentWriteOutMedium = new
OffHeapMemorySegmentWriteOutMedium();
ColumnarFloatsSerializer writer =
CompressionFactory.getFloatSerializer(
"float-benchmark",
- new OffHeapMemorySegmentWriteOutMedium(),
+ segmentWriteOutMedium,
"float",
ByteOrder.nativeOrder(),
- compression
+ compression,
+ segmentWriteOutMedium.getCloser()
);
try (
BufferedReader br = Files.newBufferedReader(dataFile.toPath(),
StandardCharsets.UTF_8);
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java
index 55d5f6b82bb..b1786e82f45 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/LongCompressionBenchmarkFileGenerator.java
@@ -29,6 +29,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.generator.ColumnValueGenerator;
import org.apache.druid.segment.generator.GeneratorColumnSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.io.BufferedReader;
import java.io.File;
@@ -148,13 +149,15 @@ public class LongCompressionBenchmarkFileGenerator
compFile.delete();
File dataFile = new File(dir, entry.getKey());
+ SegmentWriteOutMedium segmentWriteOutMedium = new
OffHeapMemorySegmentWriteOutMedium();
ColumnarLongsSerializer writer =
CompressionFactory.getLongSerializer(
"long-benchmark",
- new OffHeapMemorySegmentWriteOutMedium(),
+ segmentWriteOutMedium,
"long",
ByteOrder.nativeOrder(),
encoding,
- compression
+ compression,
+ segmentWriteOutMedium.getCloser()
);
try (
BufferedReader br = Files.newBufferedReader(dataFile.toPath(),
StandardCharsets.UTF_8);
diff --git
a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
index e6b6539fe38..ef899c2f455 100644
---
a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
+++
b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalLongColumnSerializer.java
@@ -49,7 +49,8 @@ public class CompressedBigDecimalLongColumnSerializer
implements GenericColumnSe
*/
public static CompressedBigDecimalLongColumnSerializer create(
SegmentWriteOutMedium segmentWriteOutMedium,
- String filenameBase)
+ String filenameBase
+ )
{
return new CompressedBigDecimalLongColumnSerializer(
CompressedVSizeColumnarIntsSerializer.create(
@@ -57,13 +58,17 @@ public class CompressedBigDecimalLongColumnSerializer
implements GenericColumnSe
segmentWriteOutMedium,
String.format(Locale.ROOT, "%s.scale", filenameBase),
16,
- CompressionStrategy.LZ4),
+ CompressionStrategy.LZ4,
+ segmentWriteOutMedium.getCloser()
+ ),
V3CompressedVSizeColumnarMultiIntsSerializer.create(
"dummy",
segmentWriteOutMedium,
String.format(Locale.ROOT, "%s.magnitude", filenameBase),
Integer.MAX_VALUE,
- CompressionStrategy.LZ4));
+ CompressionStrategy.LZ4
+ )
+ );
}
private final CompressedVSizeColumnarIntsSerializer scaleWriter;
diff --git
a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
index 15f7f51120d..f1d9d7c5bb4 100644
---
a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
+++
b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
@@ -423,7 +423,8 @@ public abstract class DictionaryEncodedColumnMerger<T
extends Comparable<T>> imp
segmentWriteOutMedium,
filenameBase,
cardinality,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
} else {
encodedValueSerializer = new
VSizeColumnarIntsSerializer(segmentWriteOutMedium, cardinality);
diff --git
a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
index cc525cb7ba8..1b87fdb9ceb 100644
---
a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
@@ -72,7 +72,8 @@ public class DoubleColumnSerializer implements
GenericColumnSerializer<Object>
segmentWriteOutMedium,
StringUtils.format("%s.double_column", filenameBase),
byteOrder,
- compression
+ compression,
+ segmentWriteOutMedium.getCloser()
);
writer.open();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
index 02903eb4a27..a678c695e43 100644
---
a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
+++
b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
@@ -99,7 +99,8 @@ public class DoubleColumnSerializerV2 implements
GenericColumnSerializer<Object>
segmentWriteOutMedium,
StringUtils.format("%s.double_column", filenameBase),
byteOrder,
- compression
+ compression,
+ segmentWriteOutMedium.getCloser()
);
writer.open();
nullValueBitmapWriter = new ByteBufferWriter<>(
diff --git
a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
index b96d520e2e2..e1d23946d5b 100644
---
a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
@@ -72,7 +72,8 @@ public class FloatColumnSerializer implements
GenericColumnSerializer<Object>
segmentWriteOutMedium,
StringUtils.format("%s.float_column", filenameBase),
byteOrder,
- compression
+ compression,
+ segmentWriteOutMedium.getCloser()
);
writer.open();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
index b5371a0aac9..5930ae081ea 100644
---
a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
+++
b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
@@ -99,7 +99,8 @@ public class FloatColumnSerializerV2 implements
GenericColumnSerializer<Object>
segmentWriteOutMedium,
StringUtils.format("%s.float_column", filenameBase),
byteOrder,
- compression
+ compression,
+ segmentWriteOutMedium.getCloser()
);
writer.open();
nullValueBitmapWriter = new ByteBufferWriter<>(
diff --git
a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index 6b1ff6b3195..05c721e5c56 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -202,7 +202,6 @@ public class IndexMergerV9 implements IndexMerger
mergers.add(
handler.makeMerger(
indexSpec,
-
segmentWriteOutMedium,
dimFormats.get(i).toColumnCapabilities(),
progress,
diff --git
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
index 1f8d03bebc6..6a4bcacfbb8 100644
---
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
@@ -80,7 +80,8 @@ public class LongColumnSerializer implements
GenericColumnSerializer<Object>
StringUtils.format("%s.long_column", filenameBase),
byteOrder,
encoding,
- compression
+ compression,
+ segmentWriteOutMedium.getCloser()
);
writer.open();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
index 364a7af6825..226cd8bafb0 100644
---
a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
+++
b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
@@ -105,7 +105,8 @@ public class LongColumnSerializerV2 implements
GenericColumnSerializer<Object>
StringUtils.format("%s.long_column", filenameBase),
byteOrder,
encoding,
- compression
+ compression,
+ segmentWriteOutMedium.getCloser()
);
writer.open();
nullValueBitmapWriter = new ByteBufferWriter<>(
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
index 2b4612ecb3a..8c2dfb9c028 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
@@ -56,7 +56,8 @@ public class BlockLayoutColumnarDoublesSerializer implements
ColumnarDoublesSeri
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
- CompressionStrategy compression
+ CompressionStrategy compression,
+ Closer closer
)
{
this.columnName = columnName;
@@ -64,11 +65,11 @@ public class BlockLayoutColumnarDoublesSerializer
implements ColumnarDoublesSeri
segmentWriteOutMedium,
filenameBase,
compression,
- CompressedPools.BUFFER_SIZE
+ CompressedPools.BUFFER_SIZE,
+ closer
);
this.compression = compression;
CompressionStrategy.Compressor compressor = compression.getCompressor();
- Closer closer = segmentWriteOutMedium.getCloser();
this.endBuffer = compressor.allocateInBuffer(CompressedPools.BUFFER_SIZE,
closer).order(byteOrder);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
index 94a3ef6319f..5640339a316 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
@@ -56,7 +56,8 @@ public class BlockLayoutColumnarFloatsSerializer implements
ColumnarFloatsSerial
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
- CompressionStrategy compression
+ CompressionStrategy compression,
+ Closer closer
)
{
this.columnName = columnName;
@@ -64,11 +65,11 @@ public class BlockLayoutColumnarFloatsSerializer implements
ColumnarFloatsSerial
segmentWriteOutMedium,
filenameBase,
compression,
- CompressedPools.BUFFER_SIZE
+ CompressedPools.BUFFER_SIZE,
+ closer
);
this.compression = compression;
CompressionStrategy.Compressor compressor = compression.getCompressor();
- Closer closer = segmentWriteOutMedium.getCloser();
this.endBuffer = compressor.allocateInBuffer(CompressedPools.BUFFER_SIZE,
closer).order(byteOrder);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
index ff47a34cca4..37d468d62e4 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
@@ -19,13 +19,13 @@
package org.apache.druid.segment.data;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -59,17 +59,24 @@ public class BlockLayoutColumnarLongsSerializer implements
ColumnarLongsSerializ
String filenameBase,
ByteOrder byteOrder,
CompressionFactory.LongEncodingWriter writer,
- CompressionStrategy compression
+ CompressionStrategy compression,
+ Closer closer
)
{
this.columnName = columnName;
this.sizePer = writer.getBlockSize(CompressedPools.BUFFER_SIZE);
int bufferSize = writer.getNumBytes(sizePer);
- this.flattener =
GenericIndexedWriter.ofCompressedByteBuffers(segmentWriteOutMedium,
filenameBase, compression, bufferSize);
+ this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(
+ segmentWriteOutMedium,
+ filenameBase,
+ compression,
+ bufferSize,
+ closer
+ );
this.writer = writer;
this.compression = compression;
CompressionStrategy.Compressor compressor = compression.getCompressor();
- endBuffer = compressor.allocateInBuffer(writer.getNumBytes(sizePer),
segmentWriteOutMedium.getCloser()).order(byteOrder);
+ endBuffer = compressor.allocateInBuffer(writer.getNumBytes(sizePer),
closer).order(byteOrder);
writer.setBuffer(endBuffer);
numInsertedForNextFlush = sizePer;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java
index 07208160af8..d5beedcae51 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/CompressedBlockSerializer.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.data;
import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper;
@@ -60,18 +61,16 @@ public class CompressedBlockSerializer implements Serializer
public CompressedBlockSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
-
CompressionStrategy compression,
- int blockSize
+ int blockSize,
+ Closer closer
)
{
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.compression = compression;
this.compressor = compression.getCompressor();
- this.uncompressedDataBuffer = compressor.allocateInBuffer(blockSize,
segmentWriteOutMedium.getCloser())
- .order(ByteOrder.nativeOrder());
- this.compressedDataBuffer = compressor.allocateOutBuffer(blockSize,
segmentWriteOutMedium.getCloser())
- .order(ByteOrder.nativeOrder());
+ this.uncompressedDataBuffer = compressor.allocateInBuffer(blockSize,
closer).order(ByteOrder.nativeOrder());
+ this.compressedDataBuffer = compressor.allocateOutBuffer(blockSize,
closer).order(ByteOrder.nativeOrder());
}
public void open() throws IOException
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
index a0442d95adf..cc724ba1cc7 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
@@ -58,12 +58,12 @@ public class CompressedColumnarIntsSerializer extends
SingleValueColumnarIntsSer
final String filenameBase,
final int chunkFactor,
final ByteOrder byteOrder,
- final CompressionStrategy compression
+ final CompressionStrategy compression,
+ final Closer closer
)
{
this(
columnName,
- segmentWriteOutMedium,
chunkFactor,
byteOrder,
compression,
@@ -71,18 +71,20 @@ public class CompressedColumnarIntsSerializer extends
SingleValueColumnarIntsSer
segmentWriteOutMedium,
filenameBase,
compression,
- chunkFactor * Integer.BYTES
- )
+ chunkFactor * Integer.BYTES,
+ closer
+ ),
+ closer
);
}
CompressedColumnarIntsSerializer(
final String columnName,
- final SegmentWriteOutMedium segmentWriteOutMedium,
final int chunkFactor,
final ByteOrder byteOrder,
final CompressionStrategy compression,
- final GenericIndexedWriter<ByteBuffer> flattener
+ final GenericIndexedWriter<ByteBuffer> flattener,
+ final Closer closer
)
{
this.columnName = columnName;
@@ -90,7 +92,6 @@ public class CompressedColumnarIntsSerializer extends
SingleValueColumnarIntsSer
this.compression = compression;
this.flattener = flattener;
CompressionStrategy.Compressor compressor = compression.getCompressor();
- Closer closer = segmentWriteOutMedium.getCloser();
this.endBuffer = compressor.allocateInBuffer(chunkFactor * Integer.BYTES,
closer).order(byteOrder);
this.numInserted = 0;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java
index cbf9211ec17..cfc56b84c21 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/CompressedLongsSerializer.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.data;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.Serializer;
@@ -34,12 +35,17 @@ public class CompressedLongsSerializer implements Serializer
private final CompressedBlockSerializer blockSerializer;
private final ByteBuffer longValueConverter =
ByteBuffer.allocate(Long.BYTES).order(ByteOrder.nativeOrder());
- public CompressedLongsSerializer(SegmentWriteOutMedium
segmentWriteOutMedium, CompressionStrategy compression)
+ public CompressedLongsSerializer(
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ CompressionStrategy compression,
+ Closer closer
+ )
{
this.blockSerializer = new CompressedBlockSerializer(
segmentWriteOutMedium,
compression,
- CompressedPools.BUFFER_SIZE
+ CompressedPools.BUFFER_SIZE,
+ closer
);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
index 6060be2cc3c..84f4799e6d2 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
@@ -20,13 +20,13 @@
package org.apache.druid.segment.data;
import org.apache.druid.common.utils.ByteUtils;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -51,7 +51,8 @@ public class CompressedVSizeColumnarIntsSerializer extends
SingleValueColumnarIn
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
final int maxValue,
- final CompressionStrategy compression
+ final CompressionStrategy compression,
+ final Closer closer
)
{
return new CompressedVSizeColumnarIntsSerializer(
@@ -61,7 +62,8 @@ public class CompressedVSizeColumnarIntsSerializer extends
SingleValueColumnarIn
maxValue,
CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue),
IndexIO.BYTE_ORDER,
- compression
+ compression,
+ closer
);
}
@@ -84,12 +86,12 @@ public class CompressedVSizeColumnarIntsSerializer extends
SingleValueColumnarIn
final int maxValue,
final int chunkFactor,
final ByteOrder byteOrder,
- final CompressionStrategy compression
+ final CompressionStrategy compression,
+ final Closer closer
)
{
this(
columnName,
- segmentWriteOutMedium,
maxValue,
chunkFactor,
byteOrder,
@@ -98,19 +100,21 @@ public class CompressedVSizeColumnarIntsSerializer extends
SingleValueColumnarIn
segmentWriteOutMedium,
filenameBase,
compression,
- sizePer(maxValue, chunkFactor)
- )
+ sizePer(maxValue, chunkFactor),
+ closer
+ ),
+ closer
);
}
CompressedVSizeColumnarIntsSerializer(
final String columnName,
- final SegmentWriteOutMedium segmentWriteOutMedium,
final int maxValue,
final int chunkFactor,
final ByteOrder byteOrder,
final CompressionStrategy compression,
- final GenericIndexedWriter<ByteBuffer> flattener
+ final GenericIndexedWriter<ByteBuffer> flattener,
+ final Closer closer
)
{
this.columnName = columnName;
@@ -122,7 +126,7 @@ public class CompressedVSizeColumnarIntsSerializer extends
SingleValueColumnarIn
this.flattener = flattener;
this.intBuffer = ByteBuffer.allocate(Integer.BYTES).order(byteOrder);
CompressionStrategy.Compressor compressor = compression.getCompressor();
- this.endBuffer = compressor.allocateInBuffer(chunkBytes,
segmentWriteOutMedium.getCloser()).order(byteOrder);
+ this.endBuffer = compressor.allocateInBuffer(chunkBytes,
closer).order(byteOrder);
this.numInserted = 0;
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java
index 19aa9e445a1..6693daa4326 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/CompressedVariableSizedBlobColumnSerializer.java
@@ -66,13 +66,18 @@ public class CompressedVariableSizedBlobColumnSerializer
implements Serializer
{
numValues = 0;
currentOffset = 0;
- offsetsSerializer = new CompressedLongsSerializer(segmentWriteOutMedium,
compression);
+ offsetsSerializer = new CompressedLongsSerializer(
+ segmentWriteOutMedium,
+ compression,
+ segmentWriteOutMedium.getCloser()
+ );
offsetsSerializer.open();
valuesSerializer = new CompressedBlockSerializer(
segmentWriteOutMedium,
compression,
- CompressedPools.BUFFER_SIZE
+ CompressedPools.BUFFER_SIZE,
+ segmentWriteOutMedium.getCloser()
);
valuesSerializer.open();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
index 10943316b70..dde6a440d9e 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.WriteOutBytes;
@@ -324,7 +325,8 @@ public class CompressionFactory
String filenameBase,
ByteOrder order,
LongEncodingStrategy encodingStrategy,
- CompressionStrategy compressionStrategy
+ CompressionStrategy compressionStrategy,
+ Closer closer
)
{
if (encodingStrategy == LongEncodingStrategy.AUTO) {
@@ -333,7 +335,8 @@ public class CompressionFactory
segmentWriteOutMedium,
filenameBase,
order,
- compressionStrategy
+ compressionStrategy,
+ closer
);
} else if (encodingStrategy == LongEncodingStrategy.LONGS) {
if (compressionStrategy == CompressionStrategy.NONE) {
@@ -349,7 +352,8 @@ public class CompressionFactory
filenameBase,
order,
new LongsLongEncodingWriter(order),
- compressionStrategy
+ compressionStrategy,
+ closer
);
}
} else {
@@ -379,7 +383,8 @@ public class CompressionFactory
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder order,
- CompressionStrategy compressionStrategy
+ CompressionStrategy compressionStrategy,
+ Closer closer
)
{
if (compressionStrategy == CompressionStrategy.NONE) {
@@ -390,7 +395,8 @@ public class CompressionFactory
segmentWriteOutMedium,
filenameBase,
order,
- compressionStrategy
+ compressionStrategy,
+ closer
);
}
}
@@ -417,7 +423,8 @@ public class CompressionFactory
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
- CompressionStrategy compression
+ CompressionStrategy compression,
+ Closer closer
)
{
if (compression == CompressionStrategy.NONE) {
@@ -428,7 +435,8 @@ public class CompressionFactory
segmentWriteOutMedium,
filenameBase,
byteOrder,
- compression
+ compression,
+ closer
);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
index a69d645be39..8b38125322b 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
@@ -77,13 +77,14 @@ public class GenericIndexedWriter<T> implements
DictionaryWriter<T>
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
final CompressionStrategy compressionStrategy,
- final int bufferSize
+ final int bufferSize,
+ final Closer closer
)
{
GenericIndexedWriter<ByteBuffer> writer = new GenericIndexedWriter<>(
segmentWriteOutMedium,
filenameBase,
- compressedByteBuffersWriteObjectStrategy(compressionStrategy,
bufferSize, segmentWriteOutMedium.getCloser())
+ compressedByteBuffersWriteObjectStrategy(compressionStrategy,
bufferSize, closer)
);
writer.objectsSorted = false;
return writer;
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
index c0f9355350a..7403f8dfd20 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
@@ -24,11 +24,11 @@ import it.unimi.dsi.fastutil.longs.Long2IntMap;
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
@@ -45,6 +45,7 @@ public class IntermediateColumnarLongsSerializer implements
ColumnarLongsSeriali
private final String filenameBase;
private final ByteOrder order;
private final CompressionStrategy compression;
+ private final Closer closer;
private int numInserted = 0;
@@ -64,7 +65,8 @@ public class IntermediateColumnarLongsSerializer implements
ColumnarLongsSeriali
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder order,
- CompressionStrategy compression
+ CompressionStrategy compression,
+ Closer closer
)
{
this.columnName = columnName;
@@ -72,6 +74,7 @@ public class IntermediateColumnarLongsSerializer implements
ColumnarLongsSeriali
this.filenameBase = filenameBase;
this.order = order;
this.compression = compression;
+ this.closer = closer;
}
@Override
@@ -141,7 +144,8 @@ public class IntermediateColumnarLongsSerializer implements
ColumnarLongsSeriali
filenameBase,
order,
writer,
- compression
+ compression,
+ closer
);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
index f6690293012..0fac36399d1 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
@@ -51,7 +51,8 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer
extends ColumnarMultiI
filenameBase,
CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER,
IndexIO.BYTE_ORDER,
- compression
+ compression,
+ segmentWriteOutMedium.getCloser()
),
new CompressedVSizeColumnarIntsSerializer(
columnName,
@@ -60,7 +61,8 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer
extends ColumnarMultiI
maxValue,
CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue),
IndexIO.BYTE_ORDER,
- compression
+ compression,
+ segmentWriteOutMedium.getCloser()
)
);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
index efa25865fa4..aa6a71ae754 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java
@@ -29,6 +29,7 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.java.util.common.logger.Logger;
@@ -81,6 +82,8 @@ public abstract class
GlobalDictionaryEncodedFieldColumnWriter<T>
protected final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new
Int2ObjectRBTreeMap<>();
+ protected final Closer fieldResourceCloser = Closer.create();
+
protected FixedIndexedIntWriter intermediateValueWriter;
// maybe someday we allow no bitmap indexes or multi-value columns
protected int flags = DictionaryEncodedColumnPartSerde.NO_FLAGS;
@@ -300,6 +303,7 @@ public abstract class
GlobalDictionaryEncodedFieldColumnWriter<T>
}
finally {
tmpWriteoutMedium.close();
+ fieldResourceCloser.close();
}
}
@@ -312,7 +316,8 @@ public abstract class
GlobalDictionaryEncodedFieldColumnWriter<T>
medium,
columnName,
maxId,
- indexSpec.getDimensionCompression()
+ indexSpec.getDimensionCompression(),
+ fieldResourceCloser
);
} else {
encodedValueSerializer = new VSizeColumnarIntsSerializer(medium, maxId);
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
index e077282f98f..874b8b309a4 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
@@ -95,7 +95,8 @@ public class ScalarDoubleColumnSerializer extends
ScalarNestedCommonFormatColumn
segmentWriteOutMedium,
StringUtils.format("%s.double_column", name),
ByteOrder.nativeOrder(),
- indexSpec.getDimensionCompression()
+ indexSpec.getDimensionCompression(),
+ segmentWriteOutMedium.getCloser()
);
doublesSerializer.open();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
index 144e848d831..8ccd528715b 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleFieldColumnWriter.java
@@ -67,7 +67,8 @@ public final class ScalarDoubleFieldColumnWriter extends
GlobalDictionaryEncoded
segmentWriteOutMedium,
StringUtils.format("%s.double_column", fieldName),
ByteOrder.nativeOrder(),
- indexSpec.getDimensionCompression()
+ indexSpec.getDimensionCompression(),
+ fieldResourceCloser
);
doublesSerializer.open();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
index bfb966365e2..46b70d9907c 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
@@ -97,7 +97,8 @@ public class ScalarLongColumnSerializer extends
ScalarNestedCommonFormatColumnSe
StringUtils.format("%s.long_column", name),
ByteOrder.nativeOrder(),
indexSpec.getLongEncoding(),
- indexSpec.getDimensionCompression()
+ indexSpec.getDimensionCompression(),
+ segmentWriteOutMedium.getCloser()
);
longsSerializer.open();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
index 4ca317edb01..66b5eca18d9 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongFieldColumnWriter.java
@@ -68,7 +68,8 @@ public final class ScalarLongFieldColumnWriter extends
GlobalDictionaryEncodedFi
StringUtils.format("%s.long_column", fieldName),
ByteOrder.nativeOrder(),
indexSpec.getLongEncoding(),
- indexSpec.getDimensionCompression()
+ indexSpec.getDimensionCompression(),
+ fieldResourceCloser
);
longsSerializer.open();
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
index 2caa19ad8d6..771cdb7fb5b 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
@@ -184,7 +184,8 @@ public abstract class
ScalarNestedCommonFormatColumnSerializer<T> extends Nested
segmentWriteOutMedium,
filenameBase,
dictionaryWriter.getCardinality(),
- compressionToUse
+ compressionToUse,
+ segmentWriteOutMedium.getCloser()
);
encodedValueSerializer.open();
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
index 58464b2c9e6..abd88b57df0 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
@@ -342,7 +342,8 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
segmentWriteOutMedium,
filenameBase,
cardinality,
- compressionToUse
+ compressionToUse,
+ segmentWriteOutMedium.getCloser()
);
encodedValueSerializer.open();
diff --git
a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
index 52dca67257c..6e1c4229084 100644
---
a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
@@ -167,7 +167,8 @@ public class CompressedColumnarIntsSerializerTest
"test",
CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER,
byteOrder,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
serializer.open();
@@ -196,7 +197,8 @@ public class CompressedColumnarIntsSerializerTest
"test",
chunkFactor,
byteOrder,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
CompressedColumnarIntsSupplier supplierFromList =
CompressedColumnarIntsSupplier.fromList(
IntArrayList.wrap(vals),
@@ -227,6 +229,7 @@ public class CompressedColumnarIntsSerializerTest
Assert.assertEquals(vals[i], columnarInts.get(i));
}
CloseableUtils.closeAndWrapExceptions(columnarInts);
+ CloseableUtils.closeAndWrapExceptions(segmentWriteOutMedium);
}
private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception
@@ -236,7 +239,6 @@ public class CompressedColumnarIntsSerializerTest
CompressedColumnarIntsSerializer writer = new
CompressedColumnarIntsSerializer(
"test",
- segmentWriteOutMedium,
chunkFactor,
byteOrder,
compressionStrategy,
@@ -244,8 +246,10 @@ public class CompressedColumnarIntsSerializerTest
segmentWriteOutMedium,
"test",
compressionStrategy,
- Long.BYTES * 10000
- )
+ Long.BYTES * 10000,
+ segmentWriteOutMedium.getCloser()
+ ),
+ segmentWriteOutMedium.getCloser()
);
writer.open();
diff --git
a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
index 2f94a772b4d..be472a71ed9 100644
---
a/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
@@ -147,7 +147,8 @@ public class CompressedDoublesSerdeTest
segmentWriteOutMedium,
"test",
order,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
serializer.open();
@@ -160,12 +161,14 @@ public class CompressedDoublesSerdeTest
public void testWithValues(double[] values) throws Exception
{
+ final SegmentWriteOutMedium segmentWriteOutMedium = new
OffHeapMemorySegmentWriteOutMedium();
ColumnarDoublesSerializer serializer =
CompressionFactory.getDoubleSerializer(
"test",
- new OffHeapMemorySegmentWriteOutMedium(),
+ segmentWriteOutMedium,
"test",
order,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
serializer.open();
@@ -190,6 +193,9 @@ public class CompressedDoublesSerdeTest
}
testConcurrentThreadReads(supplier, doubles, values);
}
+ finally {
+ segmentWriteOutMedium.close();
+ }
}
private void tryFill(ColumnarDoubles indexed, double[] vals, final int
startIndex, final int size)
diff --git
a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
index 14b935be2f3..02e320f46a4 100644
---
a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
@@ -154,7 +154,8 @@ public class CompressedFloatsSerdeTest
segmentWriteOutMedium,
"test",
order,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
serializer.open();
@@ -167,12 +168,14 @@ public class CompressedFloatsSerdeTest
public void testWithValues(float[] values) throws Exception
{
+ SegmentWriteOutMedium segmentWriteOutMedium = new
OffHeapMemorySegmentWriteOutMedium();
ColumnarFloatsSerializer serializer =
CompressionFactory.getFloatSerializer(
"test",
- new OffHeapMemorySegmentWriteOutMedium(),
+ segmentWriteOutMedium,
"test",
order,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
serializer.open();
diff --git
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
index 87daaddbe42..0fd5bbf6f89 100644
---
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.data;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -95,13 +96,15 @@ public class CompressedLongsAutoEncodingSerdeTest
public void testValues(long[] values) throws Exception
{
+ SegmentWriteOutMedium segmentWriteOutMedium = new
OffHeapMemorySegmentWriteOutMedium();
ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
"test",
- new OffHeapMemorySegmentWriteOutMedium(),
+ segmentWriteOutMedium,
"test",
order,
encodingStrategy,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
serializer.open();
@@ -119,6 +122,7 @@ public class CompressedLongsAutoEncodingSerdeTest
assertIndexMatchesVals(longs, values);
longs.close();
+ segmentWriteOutMedium.close();
}
private void assertIndexMatchesVals(ColumnarLongs indexed, long[] vals)
diff --git
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
index b643ee43d83..ba35a03bff5 100644
---
a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
@@ -154,7 +154,8 @@ public class CompressedLongsSerdeTest
"test",
order,
encodingStrategy,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
serializer.open();
@@ -173,13 +174,15 @@ public class CompressedLongsSerdeTest
public void testValues(long[] values) throws Exception
{
+ SegmentWriteOutMedium segmentWriteOutMedium = new
OffHeapMemorySegmentWriteOutMedium();
ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
"test",
- new OffHeapMemorySegmentWriteOutMedium(),
+ segmentWriteOutMedium,
"test",
order,
encodingStrategy,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
serializer.open();
@@ -206,6 +209,9 @@ public class CompressedLongsSerdeTest
testSupplierSerde(supplier, values);
testConcurrentThreadReads(supplier, longs, values);
}
+ finally {
+ segmentWriteOutMedium.close();
+ }
}
private void tryFill(ColumnarLongs indexed, long[] vals, final int
startIndex, final int size)
diff --git
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
index bb5868f6df2..c06e11c90d9 100644
---
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
@@ -124,7 +124,8 @@ public class CompressedVSizeColumnarIntsSerializerTest
vals.length > 0 ? Ints.max(vals) : 0,
chunkSize,
byteOrder,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
CompressedVSizeColumnarIntsSupplier supplierFromList =
CompressedVSizeColumnarIntsSupplier.fromList(
IntArrayList.wrap(vals),
@@ -197,16 +198,17 @@ public class CompressedVSizeColumnarIntsSerializerTest
segmentWriteOutMedium,
"test",
compressionStrategy,
- Long.BYTES * 10000
+ Long.BYTES * 10000,
+ segmentWriteOutMedium.getCloser()
);
CompressedVSizeColumnarIntsSerializer serializer = new
CompressedVSizeColumnarIntsSerializer(
"test",
- segmentWriteOutMedium,
maxValue,
maxChunkSize,
byteOrder,
compressionStrategy,
- genericIndexed
+ genericIndexed,
+ segmentWriteOutMedium.getCloser()
);
serializer.open();
@@ -233,16 +235,17 @@ public class CompressedVSizeColumnarIntsSerializerTest
segmentWriteOutMedium,
"test",
compressionStrategy,
- Long.BYTES * 10000
+ Long.BYTES * 10000,
+ segmentWriteOutMedium.getCloser()
);
CompressedVSizeColumnarIntsSerializer writer = new
CompressedVSizeColumnarIntsSerializer(
columnName,
- segmentWriteOutMedium,
vals.length > 0 ? Ints.max(vals) : 0,
chunkSize,
byteOrder,
compressionStrategy,
- genericIndexed
+ genericIndexed,
+ segmentWriteOutMedium.getCloser()
);
writer.open();
for (int val : vals) {
diff --git
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java
index 88685c7caaf..11609ffdc97 100644
---
a/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/data/CompressedVariableSizeBlobColumnTest.java
@@ -186,7 +186,8 @@ public class CompressedVariableSizeBlobColumnTest
final CompressionStrategy compressionStrategy = CompressionStrategy.LZ4;
CompressedLongsSerializer serializer = new CompressedLongsSerializer(
writeOutMedium,
- compressionStrategy
+ compressionStrategy,
+ writeOutMedium.getCloser()
);
serializer.open();
@@ -204,6 +205,7 @@ public class CompressedVariableSizeBlobColumnTest
serializer.writeTo(writer, smoosher);
writer.close();
smoosher.close();
+ writeOutMedium.close();
SmooshedFileMapper fileMapper = SmooshedFileMapper.load(tmpFile);
ByteBuffer base = fileMapper.mapFile(fileNameBase);
diff --git
a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
index 6877416e764..29ba49913c4 100644
---
a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
@@ -206,7 +206,8 @@ public class
V3CompressedVSizeColumnarMultiIntsSerializerTest
"offset",
offsetChunkFactor,
byteOrder,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
CompressedVSizeColumnarIntsSerializer valueWriter = new
CompressedVSizeColumnarIntsSerializer(
TEST_COLUMN_NAME,
@@ -215,7 +216,8 @@ public class
V3CompressedVSizeColumnarMultiIntsSerializerTest
maxValue,
valueChunkFactor,
byteOrder,
- compressionStrategy
+ compressionStrategy,
+ segmentWriteOutMedium.getCloser()
);
V3CompressedVSizeColumnarMultiIntsSerializer writer =
new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME,
offsetWriter, valueWriter);
@@ -271,7 +273,6 @@ public class
V3CompressedVSizeColumnarMultiIntsSerializerTest
try (SegmentWriteOutMedium segmentWriteOutMedium = new
OffHeapMemorySegmentWriteOutMedium()) {
CompressedColumnarIntsSerializer offsetWriter = new
CompressedColumnarIntsSerializer(
TEST_COLUMN_NAME,
- segmentWriteOutMedium,
offsetChunkFactor,
byteOrder,
compressionStrategy,
@@ -279,24 +280,27 @@ public class
V3CompressedVSizeColumnarMultiIntsSerializerTest
segmentWriteOutMedium,
"offset",
compressionStrategy,
- Long.BYTES * 250000
- )
+ Long.BYTES * 250000,
+ segmentWriteOutMedium.getCloser()
+ ),
+ segmentWriteOutMedium.getCloser()
);
GenericIndexedWriter genericIndexed =
GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
"value",
compressionStrategy,
- Long.BYTES * 250000
+ Long.BYTES * 250000,
+ segmentWriteOutMedium.getCloser()
);
CompressedVSizeColumnarIntsSerializer valueWriter = new
CompressedVSizeColumnarIntsSerializer(
TEST_COLUMN_NAME,
- segmentWriteOutMedium,
maxValue,
valueChunkFactor,
byteOrder,
compressionStrategy,
- genericIndexed
+ genericIndexed,
+ segmentWriteOutMedium.getCloser()
);
V3CompressedVSizeColumnarMultiIntsSerializer writer =
new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME,
offsetWriter, valueWriter);
@@ -347,7 +351,6 @@ public class
V3CompressedVSizeColumnarMultiIntsSerializerTest
) {
CompressedColumnarIntsSerializer offsetWriter = new
CompressedColumnarIntsSerializer(
TEST_COLUMN_NAME,
- segmentWriteOutMedium,
offsetChunkFactor,
byteOrder,
compressionStrategy,
@@ -355,24 +358,27 @@ public class
V3CompressedVSizeColumnarMultiIntsSerializerTest
segmentWriteOutMedium,
"offset",
compressionStrategy,
- Long.BYTES * 250000
- )
+ Long.BYTES * 250000,
+ segmentWriteOutMedium.getCloser()
+ ),
+ segmentWriteOutMedium.getCloser()
);
GenericIndexedWriter genericIndexed =
GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
"value",
compressionStrategy,
- Long.BYTES * 250000
+ Long.BYTES * 250000,
+ segmentWriteOutMedium.getCloser()
);
CompressedVSizeColumnarIntsSerializer valueWriter = new
CompressedVSizeColumnarIntsSerializer(
TEST_COLUMN_NAME,
- segmentWriteOutMedium,
maxValue,
valueChunkFactor,
byteOrder,
compressionStrategy,
- genericIndexed
+ genericIndexed,
+ segmentWriteOutMedium.getCloser()
);
V3CompressedVSizeColumnarMultiIntsSerializer writer =
new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME,
offsetWriter, valueWriter);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]