clintropolis commented on a change in pull request #6016: Druid 'Shapeshifting'
Columns
URL: https://github.com/apache/incubator-druid/pull/6016#discussion_r207380680
##########
File path: processing/src/main/java/io/druid/segment/CompressedPools.java
##########
@@ -77,41 +102,245 @@ public BufferRecycler get()
return outputBytesPool.take();
}
- private static final NonBlockingPool<ByteBuffer> bigEndByteBufPool = new
StupidPool<ByteBuffer>(
- "bigEndByteBufPool",
- new Supplier<ByteBuffer>()
- {
- private final AtomicLong counter = new AtomicLong(0);
-
- @Override
- public ByteBuffer get()
+ private static NonBlockingPool<ByteBuffer> makeBufferPool(String name, int
size, ByteOrder order)
+ {
+ return new StupidPool<>(
+ name,
+ new Supplier<ByteBuffer>()
{
- log.info("Allocating new bigEndByteBuf[%,d]",
counter.incrementAndGet());
- return
ByteBuffer.allocateDirect(BUFFER_SIZE).order(ByteOrder.BIG_ENDIAN);
+ private final AtomicLong counter = new AtomicLong(0);
+
+ @Override
+ public ByteBuffer get()
+ {
+ log.info("Allocating new %s[%,d]", name,
counter.incrementAndGet());
+ return ByteBuffer.allocateDirect(size).order(order);
+ }
}
- }
- );
+ );
+ }
- private static final NonBlockingPool<ByteBuffer> littleEndByteBufPool = new
StupidPool<ByteBuffer>(
- "littleEndByteBufPool",
- new Supplier<ByteBuffer>()
- {
- private final AtomicLong counter = new AtomicLong(0);
+ private static NonBlockingPool<int[]> makeIntArrayPool(String name, int
size, int maxCache)
+ {
+ return new StupidPool<>(
+ name,
+ new Supplier<int[]>()
+ {
+ private final AtomicLong counter = new AtomicLong(0);
- @Override
- public ByteBuffer get()
+ @Override
+ public int[] get()
+ {
+ log.info("Allocating new %s[%,d]", name,
counter.incrementAndGet());
+ return new int[size];
+ }
+ },
+ 0,
+ maxCache
+ );
+ }
+
+ private static NonBlockingPool<SkippableIntegerCODEC>
makeFastpforPool(String name, int size)
+ {
+ return new StupidPool<>(
+ name,
+ new Supplier<SkippableIntegerCODEC>()
{
- log.info("Allocating new littleEndByteBuf[%,d]",
counter.incrementAndGet());
- return
ByteBuffer.allocateDirect(BUFFER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
- }
- }
- );
+ private final AtomicLong counter = new AtomicLong(0);
+
+ @Override
+ public SkippableIntegerCODEC get()
+ {
+ log.info("Allocating new %s[%,d]", name,
counter.incrementAndGet());
+
+ Supplier<ByteBuffer> compressionBufferSupplier =
+ Suppliers.memoize(() -> ByteBuffer.allocateDirect(1 << 14));
+ return new SkippableComposition(
+ new FastPFOR(),
+ new VariableByte() {
+ // VariableByte allocates a buffer in compress method
instead of in constructor like fastpfor
+ // so override to re-use instead (and only allocate if
indexing)
+ @Override
+ protected ByteBuffer makeBuffer(int sizeInBytes)
+ {
+ ByteBuffer theBuffer = compressionBufferSupplier.get();
+ theBuffer.clear();
+ return theBuffer;
+ }
+ }
+ );
+ }
+ },
+ 0,
+ LEMIRE_FASTPFOR_CODEC_POOL_MAX_CACHE
+ );
+ }
+
+ private static final NonBlockingPool<ByteBuffer> bigEndByteBufPool =
+ makeBufferPool("bigEndByteBufPool", BUFFER_SIZE, ByteOrder.BIG_ENDIAN);
+
+ private static final NonBlockingPool<ByteBuffer> littleBigEndByteBufPool =
+ makeBufferPool("littleBigEndByteBufPool", SMALLER_BUFFER_SIZE,
ByteOrder.BIG_ENDIAN);
+
+ private static final NonBlockingPool<ByteBuffer> littlestBigEndByteBufPool =
+ makeBufferPool("littlestBigEndByteBufPool", SMALLEST_BUFFER_SIZE,
ByteOrder.BIG_ENDIAN);
+
+ private static final NonBlockingPool<ByteBuffer> littleEndByteBufPool =
+ makeBufferPool("littleEndByteBufPool", BUFFER_SIZE,
ByteOrder.LITTLE_ENDIAN);
+
+ private static final NonBlockingPool<ByteBuffer> littlerEndByteBufPool =
+ makeBufferPool("littlerEndByteBufPool", SMALLER_BUFFER_SIZE,
ByteOrder.LITTLE_ENDIAN);
+
+ private static final NonBlockingPool<ByteBuffer> littlestEndByteBufPool =
+ makeBufferPool("littlestEndByteBufPool", SMALLEST_BUFFER_SIZE,
ByteOrder.LITTLE_ENDIAN);
+
+ private static final NonBlockingPool<int[]>
shapeshiftIntsDecodedValuesArrayPool =
+ makeIntArrayPool(
+ "shapeshiftIntsDecodedValuesArrayPool",
+ INT_ARRAY_SIZE,
+ INT_DECODED_ARRAY_POOL_MAX_CACHE
+ );
+
+ private static final NonBlockingPool<int[]>
shapeshiftIntsEncodedValuesArrayPool =
+ makeIntArrayPool(
+ "shapeshiftIntsEncodedValuesArrayPool",
+ INT_ARRAY_SIZE + ENCODED_INTS_SHOULD_BE_ENOUGH,
+ INT_ENCODED_ARRAY_POOL_MAX_CACHE
+ );
+
+ private static final NonBlockingPool<int[]>
shapeshiftSmallerIntsDecodedValuesArrayPool =
+ makeIntArrayPool(
+ "shapeshiftSmallerIntsDecodedValuesArrayPool",
+ SMALLER_INT_ARRAY_SIZE,
+ INT_DECODED_ARRAY_POOL_MAX_CACHE
+ );
+
+ private static final NonBlockingPool<int[]>
shapeshiftSmallerIntsEncodedValuesArrayPool =
+ makeIntArrayPool(
+ "shapeshiftSmallerIntsEncodedValuesArrayPool",
+ SMALLER_INT_ARRAY_SIZE + ENCODED_INTS_SHOULD_BE_ENOUGH,
+ INT_ENCODED_ARRAY_POOL_MAX_CACHE
+ );
+
+ private static final NonBlockingPool<int[]>
shapeshiftSmallestIntsDecodedValuesArrayPool =
+ makeIntArrayPool(
+ "shapeshiftSmallestIntsDecodedValuesArrayPool",
+ SMALLEST_INT_ARRAY_SIZE,
+ INT_DECODED_ARRAY_POOL_MAX_CACHE
+ );
+
+ private static final NonBlockingPool<int[]>
shapeshiftSmallestIntsEncodedValuesArrayPool =
+ makeIntArrayPool(
+ "shapeshiftSmallestIntsEncodedValuesArrayPool",
+ SMALLEST_INT_ARRAY_SIZE + ENCODED_INTS_SHOULD_BE_ENOUGH,
+ INT_ENCODED_ARRAY_POOL_MAX_CACHE
+ );
Review comment:
I think this is probably worth revisiting, I pooled these at the same time
as I was trying to be more disciplined about the lifetime of objects I needed,
and query metrics on our test cluster were significantly better, but it also
could have been other changes.
Not pooling definitely puts more work on the garbage collector; let's
consider a full scan of a column with 3M rows using the largest block size,
which maps to 184 chunk and where all chunks are encoded with FastPFOR. A 64k
value array is needed for the lifetime of the column to hold the currently
decoded chunk. Additionally, during each chunk transition we need another 65k
temporary value array to copy the encoded values of the chunk to, and a
FastPFOR codec object which weighs in around 1M on heap (plus
~264k direct buffers) to decode the 65k array contents into the 64k array.
It is too heavy to just allocate one of each of these up front and keep these
alive for the lifetime of the column, and the column only needs them for a very
short period, so everything besides the 64k value array needs to be released
somehow or it makes things like merging and compaction very hard to do because
of huge heap requirements. Additionally, the FastPFOR codec objects definitely
need to be pooled, if only because of the direct buffers each instantiation
also allocates internally.
If we just consider the value array and temp arrays, that is 185 arrays
allocated and released (each requiring a contiguous chunk of memory), which is
~12.3MB. If the FastPFOR objects were also not pooled (ignoring the direct
buffers, pretend they are pooled), it pushes the amount allocated and recycled
up to ~200MB for scan of a single column.
Still, with a large enough heap it may not be a problem? As soon as I have
some spare time I'll consider testing again with the value and temp arrays,
leaving the codec object pool as is.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]