This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit dfd66a4296e9237ef287c4539977a70ccb963bc3 Author: Wei Zhong <[email protected]> AuthorDate: Thu Jan 9 09:59:26 2020 +0800 [FLINK-15338][python] Cherry-pick NETTY#8955 to fix the TM Metaspace memory leak problem in shaded netty when submitting PyFlink UDF jobs multiple times. --- .../grpc/v1p21p0/io/netty/buffer/PoolArena.java | 818 +++++++++++++++++++++ .../v1p21p0/io/netty/buffer/PoolThreadCache.java | 508 +++++++++++++ .../io/netty/buffer/PooledByteBufAllocator.java | 640 ++++++++++++++++ tools/maven/suppressions.xml | 3 + 4 files changed, 1969 insertions(+) diff --git a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java new file mode 100644 index 0000000..c573759 --- /dev/null +++ b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java @@ -0,0 +1,818 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer; + +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.LongCounter; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.StringUtil; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero; +import static java.lang.Math.max; + +// This class is copied from Netty's io.netty.buffer.PoolArena, +// can be removed after Beam bumps its shaded netty version to 1.22+ (BEAM-9030). +// +// Changed lines: 284, 295, 297~300 + +abstract class PoolArena<T> implements PoolArenaMetric { + static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe(); + + enum SizeClass { + Tiny, + Small, + Normal + } + + static final int numTinySubpagePools = 512 >>> 4; + + final PooledByteBufAllocator parent; + + private final int maxOrder; + final int pageSize; + final int pageShifts; + final int chunkSize; + final int subpageOverflowMask; + final int numSmallSubpagePools; + final int directMemoryCacheAlignment; + final int directMemoryCacheAlignmentMask; + private final PoolSubpage<T>[] tinySubpagePools; + private final PoolSubpage<T>[] smallSubpagePools; + + private final PoolChunkList<T> q050; + private final PoolChunkList<T> q025; + private final PoolChunkList<T> q000; + private final PoolChunkList<T> qInit; + private final PoolChunkList<T> q075; + private final PoolChunkList<T> q100; + + private final List<PoolChunkListMetric> chunkListMetrics; + + // Metrics for allocations and deallocations + private long allocationsNormal; + // We need to use the LongCounter here as this is not guarded via synchronized block. + private final LongCounter allocationsTiny = PlatformDependent.newLongCounter(); + private final LongCounter allocationsSmall = PlatformDependent.newLongCounter(); + private final LongCounter allocationsHuge = PlatformDependent.newLongCounter(); + private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter(); + + private long deallocationsTiny; + private long deallocationsSmall; + private long deallocationsNormal; + + // We need to use the LongCounter here as this is not guarded via synchronized block. + private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter(); + + // Number of thread caches backed by this arena. + final AtomicInteger numThreadCaches = new AtomicInteger(); + + // TODO: Test if adding padding helps under contention + //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; + + protected PoolArena(PooledByteBufAllocator parent, int pageSize, + int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) { + this.parent = parent; + this.pageSize = pageSize; + this.maxOrder = maxOrder; + this.pageShifts = pageShifts; + this.chunkSize = chunkSize; + directMemoryCacheAlignment = cacheAlignment; + directMemoryCacheAlignmentMask = cacheAlignment - 1; + subpageOverflowMask = ~(pageSize - 1); + tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); + for (int i = 0; i < tinySubpagePools.length; i ++) { + tinySubpagePools[i] = newSubpagePoolHead(pageSize); + } + + numSmallSubpagePools = pageShifts - 9; + smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools); + for (int i = 0; i < smallSubpagePools.length; i ++) { + smallSubpagePools[i] = newSubpagePoolHead(pageSize); + } + + q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize); + q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize); + q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize); + q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize); + q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize); + qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize); + + q100.prevList(q075); + q075.prevList(q050); + q050.prevList(q025); + q025.prevList(q000); + q000.prevList(null); + qInit.prevList(qInit); + + List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6); + metrics.add(qInit); + metrics.add(q000); + metrics.add(q025); + metrics.add(q050); + metrics.add(q075); + metrics.add(q100); + chunkListMetrics = Collections.unmodifiableList(metrics); + } + + private PoolSubpage<T> newSubpagePoolHead(int pageSize) { + PoolSubpage<T> head = new PoolSubpage<T>(pageSize); + head.prev = head; + head.next = head; + return head; + } + + @SuppressWarnings("unchecked") + private PoolSubpage<T>[] newSubpagePoolArray(int size) { + return new PoolSubpage[size]; + } + + abstract boolean isDirect(); + + PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { + PooledByteBuf<T> buf = newByteBuf(maxCapacity); + allocate(cache, buf, reqCapacity); + return buf; + } + + static int tinyIdx(int normCapacity) { + return normCapacity >>> 4; + } + + static int smallIdx(int normCapacity) { + int tableIdx = 0; + int i = normCapacity >>> 10; + while (i != 0) { + i >>>= 1; + tableIdx ++; + } + return tableIdx; + } + + // capacity < pageSize + boolean isTinyOrSmall(int normCapacity) { + return (normCapacity & subpageOverflowMask) == 0; + } + + // normCapacity < 512 + static boolean isTiny(int normCapacity) { + return (normCapacity & 0xFFFFFE00) == 0; + } + + private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { + final int normCapacity = normalizeCapacity(reqCapacity); + if (isTinyOrSmall(normCapacity)) { // capacity < pageSize + int tableIdx; + PoolSubpage<T>[] table; + boolean tiny = isTiny(normCapacity); + if (tiny) { // < 512 + if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { + // was able to allocate out of the cache so move on + return; + } + tableIdx = tinyIdx(normCapacity); + table = tinySubpagePools; + } else { + if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { + // was able to allocate out of the cache so move on + return; + } + tableIdx = smallIdx(normCapacity); + table = smallSubpagePools; + } + + final PoolSubpage<T> head = table[tableIdx]; + + /** + * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and + * {@link PoolChunk#free(long)} may modify the doubly linked list as well. + */ + synchronized (head) { + final PoolSubpage<T> s = head.next; + if (s != head) { + assert s.doNotDestroy && s.elemSize == normCapacity; + long handle = s.allocate(); + assert handle >= 0; + s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity); + incTinySmallAllocation(tiny); + return; + } + } + synchronized (this) { + allocateNormal(buf, reqCapacity, normCapacity); + } + + incTinySmallAllocation(tiny); + return; + } + if (normCapacity <= chunkSize) { + if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { + // was able to allocate out of the cache so move on + return; + } + synchronized (this) { + allocateNormal(buf, reqCapacity, normCapacity); + ++allocationsNormal; + } + } else { + // Huge allocations are never served via the cache so just call allocateHuge + allocateHuge(buf, reqCapacity); + } + } + + // Method must be called inside synchronized(this) { ... } block + private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { + if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || + q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || + q075.allocate(buf, reqCapacity, normCapacity)) { + return; + } + + // Add a new chunk. + PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); + boolean success = c.allocate(buf, reqCapacity, normCapacity); + assert success; + qInit.add(c); + } + + private void incTinySmallAllocation(boolean tiny) { + if (tiny) { + allocationsTiny.increment(); + } else { + allocationsSmall.increment(); + } + } + + private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) { + PoolChunk<T> chunk = newUnpooledChunk(reqCapacity); + activeBytesHuge.add(chunk.chunkSize()); + buf.initUnpooled(chunk, reqCapacity); + allocationsHuge.increment(); + } + + void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) { + if (chunk.unpooled) { + int size = chunk.chunkSize(); + destroyChunk(chunk); + activeBytesHuge.add(-size); + deallocationsHuge.increment(); + } else { + SizeClass sizeClass = sizeClass(normCapacity); + if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) { + // cached so not free it. + return; + } + + freeChunk(chunk, handle, sizeClass, nioBuffer, false); + } + } + + private SizeClass sizeClass(int normCapacity) { + if (!isTinyOrSmall(normCapacity)) { + return SizeClass.Normal; + } + return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small; + } + + void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) { + final boolean destroyChunk; + synchronized (this) { + // We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this + // may fail due lazy class-loading in for example tomcat. + if (!finalizer) { + switch (sizeClass) { + case Normal: + ++deallocationsNormal; + break; + case Small: + ++deallocationsSmall; + break; + case Tiny: + ++deallocationsTiny; + break; + default: + throw new Error(); + } + } + destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer); + } + if (destroyChunk) { + // destroyChunk not need to be called while holding the synchronized lock. + destroyChunk(chunk); + } + } + + PoolSubpage<T> findSubpagePoolHead(int elemSize) { + int tableIdx; + PoolSubpage<T>[] table; + if (isTiny(elemSize)) { // < 512 + tableIdx = elemSize >>> 4; + table = tinySubpagePools; + } else { + tableIdx = 0; + elemSize >>>= 10; + while (elemSize != 0) { + elemSize >>>= 1; + tableIdx ++; + } + table = smallSubpagePools; + } + + return table[tableIdx]; + } + + int normalizeCapacity(int reqCapacity) { + checkPositiveOrZero(reqCapacity, "reqCapacity"); + + if (reqCapacity >= chunkSize) { + return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity); + } + + if (!isTiny(reqCapacity)) { // >= 512 + // Doubled + + int normalizedCapacity = reqCapacity; + normalizedCapacity --; + normalizedCapacity |= normalizedCapacity >>> 1; + normalizedCapacity |= normalizedCapacity >>> 2; + normalizedCapacity |= normalizedCapacity >>> 4; + normalizedCapacity |= normalizedCapacity >>> 8; + normalizedCapacity |= normalizedCapacity >>> 16; + normalizedCapacity ++; + + if (normalizedCapacity < 0) { + normalizedCapacity >>>= 1; + } + assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0; + + return normalizedCapacity; + } + + if (directMemoryCacheAlignment > 0) { + return alignCapacity(reqCapacity); + } + + // Quantum-spaced + if ((reqCapacity & 15) == 0) { + return reqCapacity; + } + + return (reqCapacity & ~15) + 16; + } + + int alignCapacity(int reqCapacity) { + int delta = reqCapacity & directMemoryCacheAlignmentMask; + return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta; + } + + void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) { + if (newCapacity < 0 || newCapacity > buf.maxCapacity()) { + throw new IllegalArgumentException("newCapacity: " + newCapacity); + } + + int oldCapacity = buf.length; + if (oldCapacity == newCapacity) { + return; + } + + PoolChunk<T> oldChunk = buf.chunk; + ByteBuffer oldNioBuffer = buf.tmpNioBuf; + long oldHandle = buf.handle; + T oldMemory = buf.memory; + int oldOffset = buf.offset; + int oldMaxLength = buf.maxLength; + int readerIndex = buf.readerIndex(); + int writerIndex = buf.writerIndex(); + + allocate(parent.threadCache(), buf, newCapacity); + if (newCapacity > oldCapacity) { + memoryCopy( + oldMemory, oldOffset, + buf.memory, buf.offset, oldCapacity); + } else if (newCapacity < oldCapacity) { + if (readerIndex < newCapacity) { + if (writerIndex > newCapacity) { + writerIndex = newCapacity; + } + memoryCopy( + oldMemory, oldOffset + readerIndex, + buf.memory, buf.offset + readerIndex, writerIndex - readerIndex); + } else { + readerIndex = writerIndex = newCapacity; + } + } + + buf.setIndex(readerIndex, writerIndex); + + if (freeOldMemory) { + free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache); + } + } + + @Override + public int numThreadCaches() { + return numThreadCaches.get(); + } + + @Override + public int numTinySubpages() { + return tinySubpagePools.length; + } + + @Override + public int numSmallSubpages() { + return smallSubpagePools.length; + } + + @Override + public int numChunkLists() { + return chunkListMetrics.size(); + } + + @Override + public List<PoolSubpageMetric> tinySubpages() { + return subPageMetricList(tinySubpagePools); + } + + @Override + public List<PoolSubpageMetric> smallSubpages() { + return subPageMetricList(smallSubpagePools); + } + + @Override + public List<PoolChunkListMetric> chunkLists() { + return chunkListMetrics; + } + + private static List<PoolSubpageMetric> subPageMetricList(PoolSubpage<?>[] pages) { + List<PoolSubpageMetric> metrics = new ArrayList<PoolSubpageMetric>(); + for (PoolSubpage<?> head : pages) { + if (head.next == head) { + continue; + } + PoolSubpage<?> s = head.next; + for (;;) { + metrics.add(s); + s = s.next; + if (s == head) { + break; + } + } + } + return metrics; + } + + @Override + public long numAllocations() { + final long allocsNormal; + synchronized (this) { + allocsNormal = allocationsNormal; + } + return allocationsTiny.value() + allocationsSmall.value() + allocsNormal + allocationsHuge.value(); + } + + @Override + public long numTinyAllocations() { + return allocationsTiny.value(); + } + + @Override + public long numSmallAllocations() { + return allocationsSmall.value(); + } + + @Override + public synchronized long numNormalAllocations() { + return allocationsNormal; + } + + @Override + public long numDeallocations() { + final long deallocs; + synchronized (this) { + deallocs = deallocationsTiny + deallocationsSmall + deallocationsNormal; + } + return deallocs + deallocationsHuge.value(); + } + + @Override + public synchronized long numTinyDeallocations() { + return deallocationsTiny; + } + + @Override + public synchronized long numSmallDeallocations() { + return deallocationsSmall; + } + + @Override + public synchronized long numNormalDeallocations() { + return deallocationsNormal; + } + + @Override + public long numHugeAllocations() { + return allocationsHuge.value(); + } + + @Override + public long numHugeDeallocations() { + return deallocationsHuge.value(); + } + + @Override + public long numActiveAllocations() { + long val = allocationsTiny.value() + allocationsSmall.value() + allocationsHuge.value() + - deallocationsHuge.value(); + synchronized (this) { + val += allocationsNormal - (deallocationsTiny + deallocationsSmall + deallocationsNormal); + } + return max(val, 0); + } + + @Override + public long numActiveTinyAllocations() { + return max(numTinyAllocations() - numTinyDeallocations(), 0); + } + + @Override + public long numActiveSmallAllocations() { + return max(numSmallAllocations() - numSmallDeallocations(), 0); + } + + @Override + public long numActiveNormalAllocations() { + final long val; + synchronized (this) { + val = allocationsNormal - deallocationsNormal; + } + return max(val, 0); + } + + @Override + public long numActiveHugeAllocations() { + return max(numHugeAllocations() - numHugeDeallocations(), 0); + } + + @Override + public long numActiveBytes() { + long val = activeBytesHuge.value(); + synchronized (this) { + for (int i = 0; i < chunkListMetrics.size(); i++) { + for (PoolChunkMetric m: chunkListMetrics.get(i)) { + val += m.chunkSize(); + } + } + } + return max(0, val); + } + + protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize); + protected abstract PoolChunk<T> newUnpooledChunk(int capacity); + protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity); + protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length); + protected abstract void destroyChunk(PoolChunk<T> chunk); + + @Override + public synchronized String toString() { + StringBuilder buf = new StringBuilder() + .append("Chunk(s) at 0~25%:") + .append(StringUtil.NEWLINE) + .append(qInit) + .append(StringUtil.NEWLINE) + .append("Chunk(s) at 0~50%:") + .append(StringUtil.NEWLINE) + .append(q000) + .append(StringUtil.NEWLINE) + .append("Chunk(s) at 25~75%:") + .append(StringUtil.NEWLINE) + .append(q025) + .append(StringUtil.NEWLINE) + .append("Chunk(s) at 50~100%:") + .append(StringUtil.NEWLINE) + .append(q050) + .append(StringUtil.NEWLINE) + .append("Chunk(s) at 75~100%:") + .append(StringUtil.NEWLINE) + .append(q075) + .append(StringUtil.NEWLINE) + .append("Chunk(s) at 100%:") + .append(StringUtil.NEWLINE) + .append(q100) + .append(StringUtil.NEWLINE) + .append("tiny subpages:"); + appendPoolSubPages(buf, tinySubpagePools); + buf.append(StringUtil.NEWLINE) + .append("small subpages:"); + appendPoolSubPages(buf, smallSubpagePools); + buf.append(StringUtil.NEWLINE); + + return buf.toString(); + } + + private static void appendPoolSubPages(StringBuilder buf, PoolSubpage<?>[] subpages) { + for (int i = 0; i < subpages.length; i ++) { + PoolSubpage<?> head = subpages[i]; + if (head.next == head) { + continue; + } + + buf.append(StringUtil.NEWLINE) + .append(i) + .append(": "); + PoolSubpage<?> s = head.next; + for (;;) { + buf.append(s); + s = s.next; + if (s == head) { + break; + } + } + } + } + + @Override + protected final void finalize() throws Throwable { + try { + super.finalize(); + } finally { + destroyPoolSubPages(smallSubpagePools); + destroyPoolSubPages(tinySubpagePools); + destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100); + } + } + + private static void destroyPoolSubPages(PoolSubpage<?>[] pages) { + for (PoolSubpage<?> page : pages) { + page.destroy(); + } + } + + private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) { + for (PoolChunkList<T> chunkList: chunkLists) { + chunkList.destroy(this); + } + } + + static final class HeapArena extends PoolArena<byte[]> { + + HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, + int pageShifts, int chunkSize, int directMemoryCacheAlignment) { + super(parent, pageSize, maxOrder, pageShifts, chunkSize, + directMemoryCacheAlignment); + } + + private static byte[] newByteArray(int size) { + return PlatformDependent.allocateUninitializedArray(size); + } + + @Override + boolean isDirect() { + return false; + } + + @Override + protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { + return new PoolChunk<byte[]>(this, newByteArray(chunkSize), pageSize, maxOrder, pageShifts, chunkSize, 0); + } + + @Override + protected PoolChunk<byte[]> newUnpooledChunk(int capacity) { + return new PoolChunk<byte[]>(this, newByteArray(capacity), capacity, 0); + } + + @Override + protected void destroyChunk(PoolChunk<byte[]> chunk) { + // Rely on GC. + } + + @Override + protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) { + return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity) + : PooledHeapByteBuf.newInstance(maxCapacity); + } + + @Override + protected void memoryCopy(byte[] src, int srcOffset, byte[] dst, int dstOffset, int length) { + if (length == 0) { + return; + } + + System.arraycopy(src, srcOffset, dst, dstOffset, length); + } + } + + static final class DirectArena extends PoolArena<ByteBuffer> { + + DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, + int pageShifts, int chunkSize, int directMemoryCacheAlignment) { + super(parent, pageSize, maxOrder, pageShifts, chunkSize, + directMemoryCacheAlignment); + } + + @Override + boolean isDirect() { + return true; + } + + // mark as package-private, only for unit test + int offsetCacheLine(ByteBuffer memory) { + // We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...) will + // throw an NPE. + int remainder = HAS_UNSAFE + ? (int) (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask) + : 0; + + // offset = alignment - address & (alignment - 1) + return directMemoryCacheAlignment - remainder; + } + + @Override + protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, + int pageShifts, int chunkSize) { + if (directMemoryCacheAlignment == 0) { + return new PoolChunk<ByteBuffer>(this, + allocateDirect(chunkSize), pageSize, maxOrder, + pageShifts, chunkSize, 0); + } + final ByteBuffer memory = allocateDirect(chunkSize + + directMemoryCacheAlignment); + return new PoolChunk<ByteBuffer>(this, memory, pageSize, + maxOrder, pageShifts, chunkSize, + offsetCacheLine(memory)); + } + + @Override + protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) { + if (directMemoryCacheAlignment == 0) { + return new PoolChunk<ByteBuffer>(this, + allocateDirect(capacity), capacity, 0); + } + final ByteBuffer memory = allocateDirect(capacity + + directMemoryCacheAlignment); + return new PoolChunk<ByteBuffer>(this, memory, capacity, + offsetCacheLine(memory)); + } + + private static ByteBuffer allocateDirect(int capacity) { + return PlatformDependent.useDirectBufferNoCleaner() ? + PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity); + } + + @Override + protected void destroyChunk(PoolChunk<ByteBuffer> chunk) { + if (PlatformDependent.useDirectBufferNoCleaner()) { + PlatformDependent.freeDirectNoCleaner(chunk.memory); + } else { + PlatformDependent.freeDirectBuffer(chunk.memory); + } + } + + @Override + protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { + if (HAS_UNSAFE) { + return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); + } else { + return PooledDirectByteBuf.newInstance(maxCapacity); + } + } + + @Override + protected void memoryCopy(ByteBuffer src, int srcOffset, ByteBuffer dst, int dstOffset, int length) { + if (length == 0) { + return; + } + + if (HAS_UNSAFE) { + PlatformDependent.copyMemory( + PlatformDependent.directBufferAddress(src) + srcOffset, + PlatformDependent.directBufferAddress(dst) + dstOffset, length); + } else { + // We must duplicate the NIO buffers because they may be accessed by other Netty buffers. + src = src.duplicate(); + dst = dst.duplicate(); + src.position(srcOffset).limit(srcOffset + length); + dst.position(dstOffset); + dst.put(src); + } + } + } +} diff --git a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java new file mode 100644 index 0000000..37244e5 --- /dev/null +++ b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java @@ -0,0 +1,508 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer; + + +import static org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero; + +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.PoolArena.SizeClass; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.Recycler; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.Recycler.Handle; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.MathUtil; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLogger; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; + +// This class is copied from Netty's io.netty.buffer.PoolThreadCache, +// can be removed after Beam bumps its shaded netty version to 1.22+ (BEAM-9030). +// +// Changed lines: 235, 242, 246~251, 268, 275, 280, 284, 426~427, 430, 435, 453, 458, 463~467, 469 + +/** + * Acts a Thread cache for allocations. This implementation is moduled after + * <a href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted + * technics of + * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919"> + * Scalable memory allocation using jemalloc</a>. + */ +final class PoolThreadCache { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class); + + final PoolArena<byte[]> heapArena; + final PoolArena<ByteBuffer> directArena; + + // Hold the caches for the different size classes, which are tiny, small and normal. + private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches; + private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches; + private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; + private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; + private final MemoryRegionCache<byte[]>[] normalHeapCaches; + private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; + + // Used for bitshifting when calculate the index of normal caches later + private final int numShiftsNormalDirect; + private final int numShiftsNormalHeap; + private final int freeSweepAllocationThreshold; + private final AtomicBoolean freed = new AtomicBoolean(); + + private int allocations; + + // TODO: Test if adding padding helps under contention + //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; + + PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, + int tinyCacheSize, int smallCacheSize, int normalCacheSize, + int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { + checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity"); + this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; + this.heapArena = heapArena; + this.directArena = directArena; + if (directArena != null) { + tinySubPageDirectCaches = createSubPageCaches( + tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); + smallSubPageDirectCaches = createSubPageCaches( + smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); + + numShiftsNormalDirect = log2(directArena.pageSize); + normalDirectCaches = createNormalCaches( + normalCacheSize, maxCachedBufferCapacity, directArena); + + directArena.numThreadCaches.getAndIncrement(); + } else { + // No directArea is configured so just null out all caches + tinySubPageDirectCaches = null; + smallSubPageDirectCaches = null; + normalDirectCaches = null; + numShiftsNormalDirect = -1; + } + if (heapArena != null) { + // Create the caches for the heap allocations + tinySubPageHeapCaches = createSubPageCaches( + tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); + smallSubPageHeapCaches = createSubPageCaches( + smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small); + + numShiftsNormalHeap = log2(heapArena.pageSize); + normalHeapCaches = createNormalCaches( + normalCacheSize, maxCachedBufferCapacity, heapArena); + + heapArena.numThreadCaches.getAndIncrement(); + } else { + // No heapArea is configured so just null out all caches + tinySubPageHeapCaches = null; + smallSubPageHeapCaches = null; + normalHeapCaches = null; + numShiftsNormalHeap = -1; + } + + // Only check if there are caches in use. + if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null + || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) + && freeSweepAllocationThreshold < 1) { + throw new IllegalArgumentException("freeSweepAllocationThreshold: " + + freeSweepAllocationThreshold + " (expected: > 0)"); + } + } + + private static <T> MemoryRegionCache<T>[] createSubPageCaches( + int cacheSize, int numCaches, SizeClass sizeClass) { + if (cacheSize > 0 && numCaches > 0) { + @SuppressWarnings("unchecked") + MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; + for (int i = 0; i < cache.length; i++) { + // TODO: maybe use cacheSize / cache.length + cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); + } + return cache; + } else { + return null; + } + } + + private static <T> MemoryRegionCache<T>[] createNormalCaches( + int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) { + if (cacheSize > 0 && maxCachedBufferCapacity > 0) { + int max = Math.min(area.chunkSize, maxCachedBufferCapacity); + int arraySize = Math.max(1, log2(max / area.pageSize) + 1); + + @SuppressWarnings("unchecked") + MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize]; + for (int i = 0; i < cache.length; i++) { + cache[i] = new NormalMemoryRegionCache<T>(cacheSize); + } + return cache; + } else { + return null; + } + } + + private static int log2(int val) { + int res = 0; + while (val > 1) { + val >>= 1; + res++; + } + return res; + } + + /** + * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise + */ + boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { + return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); + } + + /** + * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise + */ + boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { + return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity); + } + + /** + * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise + */ + boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { + return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) { + if (cache == null) { + // no cache found so just return false here + return false; + } + boolean allocated = cache.allocate(buf, reqCapacity); + if (++ allocations >= freeSweepAllocationThreshold) { + allocations = 0; + trim(); + } + return allocated; + } + + /** + * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room. + * Returns {@code true} if it fit into the cache {@code false} otherwise. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer, + long handle, int normCapacity, SizeClass sizeClass) { + MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass); + if (cache == null) { + return false; + } + return cache.add(chunk, nioBuffer, handle); + } + + private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) { + switch (sizeClass) { + case Normal: + return cacheForNormal(area, normCapacity); + case Small: + return cacheForSmall(area, normCapacity); + case Tiny: + return cacheForTiny(area, normCapacity); + default: + throw new Error(); + } + } + + /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner. + @Override + protected void finalize() throws Throwable { + try { + super.finalize(); + } finally { + free(true); + } + } + + /** + * Should be called if the Thread that uses this cache is about to exist to release resources out of the cache + */ + void free(boolean finalizer) { + // As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure + // we only call this one time. + if (freed.compareAndSet(false, true)) { + int numFreed = free(tinySubPageDirectCaches, finalizer) + + free(smallSubPageDirectCaches, finalizer) + + free(normalDirectCaches, finalizer) + + free(tinySubPageHeapCaches, finalizer) + + free(smallSubPageHeapCaches, finalizer) + + free(normalHeapCaches, finalizer); + + if (numFreed > 0 && logger.isDebugEnabled()) { + logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, + Thread.currentThread().getName()); + } + + if (directArena != null) { + directArena.numThreadCaches.getAndDecrement(); + } + + if (heapArena != null) { + heapArena.numThreadCaches.getAndDecrement(); + } + } + } + + private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) { + if (caches == null) { + return 0; + } + + int numFreed = 0; + for (MemoryRegionCache<?> c: caches) { + numFreed += free(c, finalizer); + } + return numFreed; + } + + private static int free(MemoryRegionCache<?> cache, boolean finalizer) { + if (cache == null) { + return 0; + } + return cache.free(finalizer); + } + + void trim() { + trim(tinySubPageDirectCaches); + trim(smallSubPageDirectCaches); + trim(normalDirectCaches); + trim(tinySubPageHeapCaches); + trim(smallSubPageHeapCaches); + trim(normalHeapCaches); + } + + private static void trim(MemoryRegionCache<?>[] caches) { + if (caches == null) { + return; + } + for (MemoryRegionCache<?> c: caches) { + trim(c); + } + } + + private static void trim(MemoryRegionCache<?> cache) { + if (cache == null) { + return; + } + cache.trim(); + } + + private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { + int idx = PoolArena.tinyIdx(normCapacity); + if (area.isDirect()) { + return cache(tinySubPageDirectCaches, idx); + } + return cache(tinySubPageHeapCaches, idx); + } + + private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) { + int idx = PoolArena.smallIdx(normCapacity); + if (area.isDirect()) { + return cache(smallSubPageDirectCaches, idx); + } + return cache(smallSubPageHeapCaches, idx); + } + + private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) { + if (area.isDirect()) { + int idx = log2(normCapacity >> numShiftsNormalDirect); + return cache(normalDirectCaches, idx); + } + int idx = log2(normCapacity >> numShiftsNormalHeap); + return cache(normalHeapCaches, idx); + } + + private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) { + if (cache == null || idx > cache.length - 1) { + return null; + } + return cache[idx]; + } + + /** + * Cache used for buffers which are backed by TINY or SMALL size. + */ + private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> { + SubPageMemoryRegionCache(int size, SizeClass sizeClass) { + super(size, sizeClass); + } + + @Override + protected void initBuf( + PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) { + chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity); + } + } + + /** + * Cache used for buffers which are backed by NORMAL size. + */ + private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> { + NormalMemoryRegionCache(int size) { + super(size, SizeClass.Normal); + } + + @Override + protected void initBuf( + PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) { + chunk.initBuf(buf, nioBuffer, handle, reqCapacity); + } + } + + private abstract static class MemoryRegionCache<T> { + private final int size; + private final Queue<Entry<T>> queue; + private final SizeClass sizeClass; + private int allocations; + + MemoryRegionCache(int size, SizeClass sizeClass) { + this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); + queue = PlatformDependent.newFixedMpscQueue(this.size); + this.sizeClass = sizeClass; + } + + /** + * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions. + */ + protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, + PooledByteBuf<T> buf, int reqCapacity); + + /** + * Add to cache if not already full. + */ + @SuppressWarnings("unchecked") + public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) { + Entry<T> entry = newEntry(chunk, nioBuffer, handle); + boolean queued = queue.offer(entry); + if (!queued) { + // If it was not possible to cache the chunk, immediately recycle the entry + entry.recycle(); + } + + return queued; + } + + /** + * Allocate something out of the cache if possible and remove the entry from the cache. + */ + public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { + Entry<T> entry = queue.poll(); + if (entry == null) { + return false; + } + initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity); + entry.recycle(); + + // allocations is not thread-safe which is fine as this is only called from the same thread all time. + ++ allocations; + return true; + } + + /** + * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s. + */ + public final int free(boolean finalizer) { + return free(Integer.MAX_VALUE, finalizer); + } + + private int free(int max, boolean finalizer) { + int numFreed = 0; + for (; numFreed < max; numFreed++) { + Entry<T> entry = queue.poll(); + if (entry != null) { + freeEntry(entry, finalizer); + } else { + // all cleared + return numFreed; + } + } + return numFreed; + } + + /** + * Free up cached {@link PoolChunk}s if not allocated frequently enough. + */ + public final void trim() { + int free = size - allocations; + allocations = 0; + + // We not even allocated all the number that are + if (free > 0) { + free(free, false); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void freeEntry(Entry entry, boolean finalizer) { + PoolChunk chunk = entry.chunk; + long handle = entry.handle; + ByteBuffer nioBuffer = entry.nioBuffer; + + if (!finalizer) { + // recycle now so PoolChunk can be GC'ed. This will only be done if this is not freed because of + // a finalizer. + entry.recycle(); + } + + chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer, finalizer); + } + + static final class Entry<T> { + final Handle<Entry<?>> recyclerHandle; + PoolChunk<T> chunk; + ByteBuffer nioBuffer; + long handle = -1; + + Entry(Handle<Entry<?>> recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + void recycle() { + chunk = null; + nioBuffer = null; + handle = -1; + recyclerHandle.recycle(this); + } + } + + @SuppressWarnings("rawtypes") + private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) { + Entry entry = RECYCLER.get(); + entry.chunk = chunk; + entry.nioBuffer = nioBuffer; + entry.handle = handle; + return entry; + } + + @SuppressWarnings("rawtypes") + private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() { + @SuppressWarnings("unchecked") + @Override + protected Entry newObject(Handle<Entry> handle) { + return new Entry(handle); + } + }; + } +} diff --git a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java new file mode 100644 index 0000000..2124bdc --- /dev/null +++ b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java @@ -0,0 +1,640 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer; + +import static org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero; + +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.NettyRuntime; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocal; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocalThread; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.StringUtil; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.SystemPropertyUtil; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLogger; +import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +// This class is copied from Netty's io.netty.buffer.PooledByteBufAllocator, +// can be removed after Beam bumps its shaded netty version to 1.22+ (BEAM-9030). +// +// Changed lines: 458 + +public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class); + private static final int DEFAULT_NUM_HEAP_ARENA; + private static final int DEFAULT_NUM_DIRECT_ARENA; + + private static final int DEFAULT_PAGE_SIZE; + private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk + private static final int DEFAULT_TINY_CACHE_SIZE; + private static final int DEFAULT_SMALL_CACHE_SIZE; + private static final int DEFAULT_NORMAL_CACHE_SIZE; + private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; + private static final int DEFAULT_CACHE_TRIM_INTERVAL; + private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS; + private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT; + static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK; + + private static final int MIN_PAGE_SIZE = 4096; + private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); + + static { + int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192); + Throwable pageSizeFallbackCause = null; + try { + validateAndCalculatePageShifts(defaultPageSize); + } catch (Throwable t) { + pageSizeFallbackCause = t; + defaultPageSize = 8192; + } + DEFAULT_PAGE_SIZE = defaultPageSize; + + int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11); + Throwable maxOrderFallbackCause = null; + try { + validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder); + } catch (Throwable t) { + maxOrderFallbackCause = t; + defaultMaxOrder = 11; + } + DEFAULT_MAX_ORDER = defaultMaxOrder; + + // Determine reasonable default for nHeapArena and nDirectArena. + // Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory. + final Runtime runtime = Runtime.getRuntime(); + + /* + * We use 2 * available processors by default to reduce contention as we use 2 * available processors for the + * number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as + * allocation and de-allocation needs to be synchronized on the PoolArena. + * + * See https://github.com/netty/netty/issues/3888. + */ + final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2; + final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER; + DEFAULT_NUM_HEAP_ARENA = Math.max(0, + SystemPropertyUtil.getInt( + "io.netty.allocator.numHeapArenas", + (int) Math.min( + defaultMinNumArena, + runtime.maxMemory() / defaultChunkSize / 2 / 3))); + DEFAULT_NUM_DIRECT_ARENA = Math.max(0, + SystemPropertyUtil.getInt( + "io.netty.allocator.numDirectArenas", + (int) Math.min( + defaultMinNumArena, + PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3))); + + // cache sizes + DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512); + DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256); + DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64); + + // 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in + // 'Scalable memory allocation using jemalloc' + DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt( + "io.netty.allocator.maxCachedBufferCapacity", 32 * 1024); + + // the number of threshold of allocations when cached entries will be freed up if not frequently used + DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt( + "io.netty.allocator.cacheTrimInterval", 8192); + + DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean( + "io.netty.allocator.useCacheForAllThreads", true); + + DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt( + "io.netty.allocator.directMemoryCacheAlignment", 0); + + // Use 1023 by default as we use an ArrayDeque as backing storage which will then allocate an internal array + // of 1024 elements. Otherwise we would allocate 2048 and only use 1024 which is wasteful. + DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK = SystemPropertyUtil.getInt( + "io.netty.allocator.maxCachedByteBuffersPerChunk", 1023); + + if (logger.isDebugEnabled()) { + logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA); + logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA); + if (pageSizeFallbackCause == null) { + logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE); + } else { + logger.debug("-Dio.netty.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause); + } + if (maxOrderFallbackCause == null) { + logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER); + } else { + logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause); + } + logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER); + logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE); + logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE); + logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE); + logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY); + logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL); + logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS); + logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}", + DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK); + } + } + + public static final PooledByteBufAllocator DEFAULT = + new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); + + private final PoolArena<byte[]>[] heapArenas; + private final PoolArena<ByteBuffer>[] directArenas; + private final int tinyCacheSize; + private final int smallCacheSize; + private final int normalCacheSize; + private final List<PoolArenaMetric> heapArenaMetrics; + private final List<PoolArenaMetric> directArenaMetrics; + private final PoolThreadLocalCache threadCache; + private final int chunkSize; + private final PooledByteBufAllocatorMetric metric; + + public PooledByteBufAllocator() { + this(false); + } + + @SuppressWarnings("deprecation") + public PooledByteBufAllocator(boolean preferDirect) { + this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER); + } + + @SuppressWarnings("deprecation") + public PooledByteBufAllocator(int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { + this(false, nHeapArena, nDirectArena, pageSize, maxOrder); + } + + /** + * @deprecated use + * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, int, boolean)} + */ + @Deprecated + public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { + this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, + DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE); + } + + /** + * @deprecated use + * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, int, int, int, int, int, int, boolean)} + */ + @Deprecated + public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, + int tinyCacheSize, int smallCacheSize, int normalCacheSize) { + this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize, + normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT); + } + + public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, + int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, + int smallCacheSize, int normalCacheSize, + boolean useCacheForAllThreads) { + this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, + tinyCacheSize, smallCacheSize, normalCacheSize, + useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT); + } + + public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, + int tinyCacheSize, int smallCacheSize, int normalCacheSize, + boolean useCacheForAllThreads, int directMemoryCacheAlignment) { + super(preferDirect); + threadCache = new PoolThreadLocalCache(useCacheForAllThreads); + this.tinyCacheSize = tinyCacheSize; + this.smallCacheSize = smallCacheSize; + this.normalCacheSize = normalCacheSize; + chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); + + checkPositiveOrZero(nHeapArena, "nHeapArena"); + checkPositiveOrZero(nDirectArena, "nDirectArena"); + + checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment"); + if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) { + throw new IllegalArgumentException("directMemoryCacheAlignment is not supported"); + } + + if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) { + throw new IllegalArgumentException("directMemoryCacheAlignment: " + + directMemoryCacheAlignment + " (expected: power of two)"); + } + + int pageShifts = validateAndCalculatePageShifts(pageSize); + + if (nHeapArena > 0) { + heapArenas = newArenaArray(nHeapArena); + List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length); + for (int i = 0; i < heapArenas.length; i ++) { + PoolArena.HeapArena arena = new PoolArena.HeapArena(this, + pageSize, maxOrder, pageShifts, chunkSize, + directMemoryCacheAlignment); + heapArenas[i] = arena; + metrics.add(arena); + } + heapArenaMetrics = Collections.unmodifiableList(metrics); + } else { + heapArenas = null; + heapArenaMetrics = Collections.emptyList(); + } + + if (nDirectArena > 0) { + directArenas = newArenaArray(nDirectArena); + List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); + for (int i = 0; i < directArenas.length; i ++) { + PoolArena.DirectArena arena = new PoolArena.DirectArena( + this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); + directArenas[i] = arena; + metrics.add(arena); + } + directArenaMetrics = Collections.unmodifiableList(metrics); + } else { + directArenas = null; + directArenaMetrics = Collections.emptyList(); + } + metric = new PooledByteBufAllocatorMetric(this); + } + + @SuppressWarnings("unchecked") + private static <T> PoolArena<T>[] newArenaArray(int size) { + return new PoolArena[size]; + } + + private static int validateAndCalculatePageShifts(int pageSize) { + if (pageSize < MIN_PAGE_SIZE) { + throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + ")"); + } + + if ((pageSize & pageSize - 1) != 0) { + throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)"); + } + + // Logarithm base 2. At this point we know that pageSize is a power of two. + return Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize); + } + + private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) { + if (maxOrder > 14) { + throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)"); + } + + // Ensure the resulting chunkSize does not overflow. + int chunkSize = pageSize; + for (int i = maxOrder; i > 0; i --) { + if (chunkSize > MAX_CHUNK_SIZE / 2) { + throw new IllegalArgumentException(String.format( + "pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE)); + } + chunkSize <<= 1; + } + return chunkSize; + } + + @Override + protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { + PoolThreadCache cache = threadCache.get(); + PoolArena<byte[]> heapArena = cache.heapArena; + + final ByteBuf buf; + if (heapArena != null) { + buf = heapArena.allocate(cache, initialCapacity, maxCapacity); + } else { + buf = PlatformDependent.hasUnsafe() ? + new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : + new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); + } + + return toLeakAwareBuffer(buf); + } + + @Override + protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { + PoolThreadCache cache = threadCache.get(); + PoolArena<ByteBuffer> directArena = cache.directArena; + + final ByteBuf buf; + if (directArena != null) { + buf = directArena.allocate(cache, initialCapacity, maxCapacity); + } else { + buf = PlatformDependent.hasUnsafe() ? + UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : + new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); + } + + return toLeakAwareBuffer(buf); + } + + /** + * Default number of heap arenas - System Property: io.netty.allocator.numHeapArenas - default 2 * cores + */ + public static int defaultNumHeapArena() { + return DEFAULT_NUM_HEAP_ARENA; + } + + /** + * Default number of direct arenas - System Property: io.netty.allocator.numDirectArenas - default 2 * cores + */ + public static int defaultNumDirectArena() { + return DEFAULT_NUM_DIRECT_ARENA; + } + + /** + * Default buffer page size - System Property: io.netty.allocator.pageSize - default 8192 + */ + public static int defaultPageSize() { + return DEFAULT_PAGE_SIZE; + } + + /** + * Default maximum order - System Property: io.netty.allocator.maxOrder - default 11 + */ + public static int defaultMaxOrder() { + return DEFAULT_MAX_ORDER; + } + + /** + * Default thread caching behavior - System Property: io.netty.allocator.useCacheForAllThreads - default true + */ + public static boolean defaultUseCacheForAllThreads() { + return DEFAULT_USE_CACHE_FOR_ALL_THREADS; + } + + /** + * Default prefer direct - System Property: io.netty.noPreferDirect - default false + */ + public static boolean defaultPreferDirect() { + return PlatformDependent.directBufferPreferred(); + } + + /** + * Default tiny cache size - System Property: io.netty.allocator.tinyCacheSize - default 512 + */ + public static int defaultTinyCacheSize() { + return DEFAULT_TINY_CACHE_SIZE; + } + + /** + * Default small cache size - System Property: io.netty.allocator.smallCacheSize - default 256 + */ + public static int defaultSmallCacheSize() { + return DEFAULT_SMALL_CACHE_SIZE; + } + + /** + * Default normal cache size - System Property: io.netty.allocator.normalCacheSize - default 64 + */ + public static int defaultNormalCacheSize() { + return DEFAULT_NORMAL_CACHE_SIZE; + } + + /** + * Return {@code true} if direct memory cache alignment is supported, {@code false} otherwise. + */ + public static boolean isDirectMemoryCacheAlignmentSupported() { + return PlatformDependent.hasUnsafe(); + } + + @Override + public boolean isDirectBufferPooled() { + return directArenas != null; + } + + /** + * Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated + * buffers. + */ + @Deprecated + public boolean hasThreadLocalCache() { + return threadCache.isSet(); + } + + /** + * Free all cached buffers for the calling {@link Thread}. + */ + @Deprecated + public void freeThreadLocalCache() { + threadCache.remove(); + } + + final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { + private final boolean useCacheForAllThreads; + + PoolThreadLocalCache(boolean useCacheForAllThreads) { + this.useCacheForAllThreads = useCacheForAllThreads; + } + + @Override + protected synchronized PoolThreadCache initialValue() { + final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); + final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); + + Thread current = Thread.currentThread(); + if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { + return new PoolThreadCache( + heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, + DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); + } + // No caching so just use 0 as sizes. + return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); + } + + @Override + protected void onRemoval(PoolThreadCache threadCache) { + threadCache.free(false); + } + + private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) { + if (arenas == null || arenas.length == 0) { + return null; + } + + PoolArena<T> minArena = arenas[0]; + for (int i = 1; i < arenas.length; i++) { + PoolArena<T> arena = arenas[i]; + if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) { + minArena = arena; + } + } + + return minArena; + } + } + + @Override + public PooledByteBufAllocatorMetric metric() { + return metric; + } + + /** + * Return the number of heap arenas. + * + * @deprecated use {@link PooledByteBufAllocatorMetric#numHeapArenas()}. + */ + @Deprecated + public int numHeapArenas() { + return heapArenaMetrics.size(); + } + + /** + * Return the number of direct arenas. + * + * @deprecated use {@link PooledByteBufAllocatorMetric#numDirectArenas()}. + */ + @Deprecated + public int numDirectArenas() { + return directArenaMetrics.size(); + } + + /** + * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool. + * + * @deprecated use {@link PooledByteBufAllocatorMetric#heapArenas()}. + */ + @Deprecated + public List<PoolArenaMetric> heapArenas() { + return heapArenaMetrics; + } + + /** + * Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool. + * + * @deprecated use {@link PooledByteBufAllocatorMetric#directArenas()}. + */ + @Deprecated + public List<PoolArenaMetric> directArenas() { + return directArenaMetrics; + } + + /** + * Return the number of thread local caches used by this {@link PooledByteBufAllocator}. + * + * @deprecated use {@link PooledByteBufAllocatorMetric#numThreadLocalCaches()}. + */ + @Deprecated + public int numThreadLocalCaches() { + PoolArena<?>[] arenas = heapArenas != null ? heapArenas : directArenas; + if (arenas == null) { + return 0; + } + + int total = 0; + for (PoolArena<?> arena : arenas) { + total += arena.numThreadCaches.get(); + } + + return total; + } + + /** + * Return the size of the tiny cache. + * + * @deprecated use {@link PooledByteBufAllocatorMetric#tinyCacheSize()}. + */ + @Deprecated + public int tinyCacheSize() { + return tinyCacheSize; + } + + /** + * Return the size of the small cache. + * + * @deprecated use {@link PooledByteBufAllocatorMetric#smallCacheSize()}. + */ + @Deprecated + public int smallCacheSize() { + return smallCacheSize; + } + + /** + * Return the size of the normal cache. + * + * @deprecated use {@link PooledByteBufAllocatorMetric#normalCacheSize()}. + */ + @Deprecated + public int normalCacheSize() { + return normalCacheSize; + } + + /** + * Return the chunk size for an arena. + * + * @deprecated use {@link PooledByteBufAllocatorMetric#chunkSize()}. + */ + @Deprecated + public final int chunkSize() { + return chunkSize; + } + + final long usedHeapMemory() { + return usedMemory(heapArenas); + } + + final long usedDirectMemory() { + return usedMemory(directArenas); + } + + private static long usedMemory(PoolArena<?>[] arenas) { + if (arenas == null) { + return -1; + } + long used = 0; + for (PoolArena<?> arena : arenas) { + used += arena.numActiveBytes(); + if (used < 0) { + return Long.MAX_VALUE; + } + } + return used; + } + + final PoolThreadCache threadCache() { + PoolThreadCache cache = threadCache.get(); + assert cache != null; + return cache; + } + + /** + * Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive + * and so should not called too frequently. + */ + public String dumpStats() { + int heapArenasLen = heapArenas == null ? 0 : heapArenas.length; + StringBuilder buf = new StringBuilder(512) + .append(heapArenasLen) + .append(" heap arena(s):") + .append(StringUtil.NEWLINE); + if (heapArenasLen > 0) { + for (PoolArena<byte[]> a: heapArenas) { + buf.append(a); + } + } + + int directArenasLen = directArenas == null ? 0 : directArenas.length; + + buf.append(directArenasLen) + .append(" direct arena(s):") + .append(StringUtil.NEWLINE); + if (directArenasLen > 0) { + for (PoolArena<ByteBuffer> a: directArenas) { + buf.append(a); + } + } + + return buf.toString(); + } +} diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index 4a56880..ff3eb79 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -28,6 +28,9 @@ under the License. <!-- Sometimes we have to temporarily fix very long, different formatted Calcite files. --> <suppress files="org[\\/]apache[\\/]calcite.*" checks="[a-zA-Z0-9]*"/> + <!-- Temporarily fix TM Metaspace memory leak caused by Apache Beam sdk harness. --> + <suppress files="org[\\/]apache[\\/]beam[\\/]vendor[\\/]grpc[\\/]v1p21p0[\\/]io[\\/]netty[\\/]buffer.*.java" checks="[a-zA-Z0-9]*"/> + <!-- Python streaming API follows python naming conventions --> <suppress files="org[\\/]apache[\\/]flink[\\/]streaming[\\/]python[\\/]api[\\/].*.java"
