This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dfd66a4296e9237ef287c4539977a70ccb963bc3
Author: Wei Zhong <[email protected]>
AuthorDate: Thu Jan 9 09:59:26 2020 +0800

    [FLINK-15338][python] Cherry-pick NETTY#8955 to fix the TM Metaspace memory 
leak problem in shaded netty when submitting PyFlink UDF jobs multiple times.
---
 .../grpc/v1p21p0/io/netty/buffer/PoolArena.java    | 818 +++++++++++++++++++++
 .../v1p21p0/io/netty/buffer/PoolThreadCache.java   | 508 +++++++++++++
 .../io/netty/buffer/PooledByteBufAllocator.java    | 640 ++++++++++++++++
 tools/maven/suppressions.xml                       |   3 +
 4 files changed, 1969 insertions(+)

diff --git 
a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java
 
b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java
new file mode 100644
index 0000000..c573759
--- /dev/null
+++ 
b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolArena.java
@@ -0,0 +1,818 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer;
+
+import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.LongCounter;
+import 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent;
+import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.StringUtil;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+
+// This class is copied from Netty's io.netty.buffer.PoolArena,
+// can be removed after Beam bumps its shaded netty version to 1.22+ 
(BEAM-9030).
+//
+// Changed lines: 284, 295, 297~300
+
+abstract class PoolArena<T> implements PoolArenaMetric {
+       static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
+
+       enum SizeClass {
+               Tiny,
+               Small,
+               Normal
+       }
+
+       static final int numTinySubpagePools = 512 >>> 4;
+
+       final PooledByteBufAllocator parent;
+
+       private final int maxOrder;
+       final int pageSize;
+       final int pageShifts;
+       final int chunkSize;
+       final int subpageOverflowMask;
+       final int numSmallSubpagePools;
+       final int directMemoryCacheAlignment;
+       final int directMemoryCacheAlignmentMask;
+       private final PoolSubpage<T>[] tinySubpagePools;
+       private final PoolSubpage<T>[] smallSubpagePools;
+
+       private final PoolChunkList<T> q050;
+       private final PoolChunkList<T> q025;
+       private final PoolChunkList<T> q000;
+       private final PoolChunkList<T> qInit;
+       private final PoolChunkList<T> q075;
+       private final PoolChunkList<T> q100;
+
+       private final List<PoolChunkListMetric> chunkListMetrics;
+
+       // Metrics for allocations and deallocations
+       private long allocationsNormal;
+       // We need to use the LongCounter here as this is not guarded via 
synchronized block.
+       private final LongCounter allocationsTiny = 
PlatformDependent.newLongCounter();
+       private final LongCounter allocationsSmall = 
PlatformDependent.newLongCounter();
+       private final LongCounter allocationsHuge = 
PlatformDependent.newLongCounter();
+       private final LongCounter activeBytesHuge = 
PlatformDependent.newLongCounter();
+
+       private long deallocationsTiny;
+       private long deallocationsSmall;
+       private long deallocationsNormal;
+
+       // We need to use the LongCounter here as this is not guarded via 
synchronized block.
+       private final LongCounter deallocationsHuge = 
PlatformDependent.newLongCounter();
+
+       // Number of thread caches backed by this arena.
+       final AtomicInteger numThreadCaches = new AtomicInteger();
+
+       // TODO: Test if adding padding helps under contention
+       //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
+
+       protected PoolArena(PooledByteBufAllocator parent, int pageSize,
+                                               int maxOrder, int pageShifts, 
int chunkSize, int cacheAlignment) {
+               this.parent = parent;
+               this.pageSize = pageSize;
+               this.maxOrder = maxOrder;
+               this.pageShifts = pageShifts;
+               this.chunkSize = chunkSize;
+               directMemoryCacheAlignment = cacheAlignment;
+               directMemoryCacheAlignmentMask = cacheAlignment - 1;
+               subpageOverflowMask = ~(pageSize - 1);
+               tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
+               for (int i = 0; i < tinySubpagePools.length; i ++) {
+                       tinySubpagePools[i] = newSubpagePoolHead(pageSize);
+               }
+
+               numSmallSubpagePools = pageShifts - 9;
+               smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
+               for (int i = 0; i < smallSubpagePools.length; i ++) {
+                       smallSubpagePools[i] = newSubpagePoolHead(pageSize);
+               }
+
+               q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, 
chunkSize);
+               q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
+               q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
+               q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
+               q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
+               qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, 
chunkSize);
+
+               q100.prevList(q075);
+               q075.prevList(q050);
+               q050.prevList(q025);
+               q025.prevList(q000);
+               q000.prevList(null);
+               qInit.prevList(qInit);
+
+               List<PoolChunkListMetric> metrics = new 
ArrayList<PoolChunkListMetric>(6);
+               metrics.add(qInit);
+               metrics.add(q000);
+               metrics.add(q025);
+               metrics.add(q050);
+               metrics.add(q075);
+               metrics.add(q100);
+               chunkListMetrics = Collections.unmodifiableList(metrics);
+       }
+
+       private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
+               PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
+               head.prev = head;
+               head.next = head;
+               return head;
+       }
+
+       @SuppressWarnings("unchecked")
+       private PoolSubpage<T>[] newSubpagePoolArray(int size) {
+               return new PoolSubpage[size];
+       }
+
+       abstract boolean isDirect();
+
+       PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int 
maxCapacity) {
+               PooledByteBuf<T> buf = newByteBuf(maxCapacity);
+               allocate(cache, buf, reqCapacity);
+               return buf;
+       }
+
+       static int tinyIdx(int normCapacity) {
+               return normCapacity >>> 4;
+       }
+
+       static int smallIdx(int normCapacity) {
+               int tableIdx = 0;
+               int i = normCapacity >>> 10;
+               while (i != 0) {
+                       i >>>= 1;
+                       tableIdx ++;
+               }
+               return tableIdx;
+       }
+
+       // capacity < pageSize
+       boolean isTinyOrSmall(int normCapacity) {
+               return (normCapacity & subpageOverflowMask) == 0;
+       }
+
+       // normCapacity < 512
+       static boolean isTiny(int normCapacity) {
+               return (normCapacity & 0xFFFFFE00) == 0;
+       }
+
+       private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, 
final int reqCapacity) {
+               final int normCapacity = normalizeCapacity(reqCapacity);
+               if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
+                       int tableIdx;
+                       PoolSubpage<T>[] table;
+                       boolean tiny = isTiny(normCapacity);
+                       if (tiny) { // < 512
+                               if (cache.allocateTiny(this, buf, reqCapacity, 
normCapacity)) {
+                                       // was able to allocate out of the 
cache so move on
+                                       return;
+                               }
+                               tableIdx = tinyIdx(normCapacity);
+                               table = tinySubpagePools;
+                       } else {
+                               if (cache.allocateSmall(this, buf, reqCapacity, 
normCapacity)) {
+                                       // was able to allocate out of the 
cache so move on
+                                       return;
+                               }
+                               tableIdx = smallIdx(normCapacity);
+                               table = smallSubpagePools;
+                       }
+
+                       final PoolSubpage<T> head = table[tableIdx];
+
+                       /**
+                        * Synchronize on the head. This is needed as {@link 
PoolChunk#allocateSubpage(int)} and
+                        * {@link PoolChunk#free(long)} may modify the doubly 
linked list as well.
+                        */
+                       synchronized (head) {
+                               final PoolSubpage<T> s = head.next;
+                               if (s != head) {
+                                       assert s.doNotDestroy && s.elemSize == 
normCapacity;
+                                       long handle = s.allocate();
+                                       assert handle >= 0;
+                                       s.chunk.initBufWithSubpage(buf, null, 
handle, reqCapacity);
+                                       incTinySmallAllocation(tiny);
+                                       return;
+                               }
+                       }
+                       synchronized (this) {
+                               allocateNormal(buf, reqCapacity, normCapacity);
+                       }
+
+                       incTinySmallAllocation(tiny);
+                       return;
+               }
+               if (normCapacity <= chunkSize) {
+                       if (cache.allocateNormal(this, buf, reqCapacity, 
normCapacity)) {
+                               // was able to allocate out of the cache so 
move on
+                               return;
+                       }
+                       synchronized (this) {
+                               allocateNormal(buf, reqCapacity, normCapacity);
+                               ++allocationsNormal;
+                       }
+               } else {
+                       // Huge allocations are never served via the cache so 
just call allocateHuge
+                       allocateHuge(buf, reqCapacity);
+               }
+       }
+
+       // Method must be called inside synchronized(this) { ... } block
+       private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int 
normCapacity) {
+               if (q050.allocate(buf, reqCapacity, normCapacity) || 
q025.allocate(buf, reqCapacity, normCapacity) ||
+                       q000.allocate(buf, reqCapacity, normCapacity) || 
qInit.allocate(buf, reqCapacity, normCapacity) ||
+                       q075.allocate(buf, reqCapacity, normCapacity)) {
+                       return;
+               }
+
+               // Add a new chunk.
+               PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, 
chunkSize);
+               boolean success = c.allocate(buf, reqCapacity, normCapacity);
+               assert success;
+               qInit.add(c);
+       }
+
+       private void incTinySmallAllocation(boolean tiny) {
+               if (tiny) {
+                       allocationsTiny.increment();
+               } else {
+                       allocationsSmall.increment();
+               }
+       }
+
+       private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
+               PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
+               activeBytesHuge.add(chunk.chunkSize());
+               buf.initUnpooled(chunk, reqCapacity);
+               allocationsHuge.increment();
+       }
+
+       void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int 
normCapacity, PoolThreadCache cache) {
+               if (chunk.unpooled) {
+                       int size = chunk.chunkSize();
+                       destroyChunk(chunk);
+                       activeBytesHuge.add(-size);
+                       deallocationsHuge.increment();
+               } else {
+                       SizeClass sizeClass = sizeClass(normCapacity);
+                       if (cache != null && cache.add(this, chunk, nioBuffer, 
handle, normCapacity, sizeClass)) {
+                               // cached so not free it.
+                               return;
+                       }
+
+                       freeChunk(chunk, handle, sizeClass, nioBuffer, false);
+               }
+       }
+
+       private SizeClass sizeClass(int normCapacity) {
+               if (!isTinyOrSmall(normCapacity)) {
+                       return SizeClass.Normal;
+               }
+               return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
+       }
+
+       void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, 
ByteBuffer nioBuffer, boolean finalizer) {
+               final boolean destroyChunk;
+               synchronized (this) {
+                       // We only call this if freeChunk is not called because 
of the PoolThreadCache finalizer as otherwise this
+                       // may fail due lazy class-loading in for example 
tomcat.
+                       if (!finalizer) {
+                               switch (sizeClass) {
+                                       case Normal:
+                                               ++deallocationsNormal;
+                                               break;
+                                       case Small:
+                                               ++deallocationsSmall;
+                                               break;
+                                       case Tiny:
+                                               ++deallocationsTiny;
+                                               break;
+                                       default:
+                                               throw new Error();
+                               }
+                       }
+                       destroyChunk = !chunk.parent.free(chunk, handle, 
nioBuffer);
+               }
+               if (destroyChunk) {
+                       // destroyChunk not need to be called while holding the 
synchronized lock.
+                       destroyChunk(chunk);
+               }
+       }
+
+       PoolSubpage<T> findSubpagePoolHead(int elemSize) {
+               int tableIdx;
+               PoolSubpage<T>[] table;
+               if (isTiny(elemSize)) { // < 512
+                       tableIdx = elemSize >>> 4;
+                       table = tinySubpagePools;
+               } else {
+                       tableIdx = 0;
+                       elemSize >>>= 10;
+                       while (elemSize != 0) {
+                               elemSize >>>= 1;
+                               tableIdx ++;
+                       }
+                       table = smallSubpagePools;
+               }
+
+               return table[tableIdx];
+       }
+
+       int normalizeCapacity(int reqCapacity) {
+               checkPositiveOrZero(reqCapacity, "reqCapacity");
+
+               if (reqCapacity >= chunkSize) {
+                       return directMemoryCacheAlignment == 0 ? reqCapacity : 
alignCapacity(reqCapacity);
+               }
+
+               if (!isTiny(reqCapacity)) { // >= 512
+                       // Doubled
+
+                       int normalizedCapacity = reqCapacity;
+                       normalizedCapacity --;
+                       normalizedCapacity |= normalizedCapacity >>>  1;
+                       normalizedCapacity |= normalizedCapacity >>>  2;
+                       normalizedCapacity |= normalizedCapacity >>>  4;
+                       normalizedCapacity |= normalizedCapacity >>>  8;
+                       normalizedCapacity |= normalizedCapacity >>> 16;
+                       normalizedCapacity ++;
+
+                       if (normalizedCapacity < 0) {
+                               normalizedCapacity >>>= 1;
+                       }
+                       assert directMemoryCacheAlignment == 0 || 
(normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
+
+                       return normalizedCapacity;
+               }
+
+               if (directMemoryCacheAlignment > 0) {
+                       return alignCapacity(reqCapacity);
+               }
+
+               // Quantum-spaced
+               if ((reqCapacity & 15) == 0) {
+                       return reqCapacity;
+               }
+
+               return (reqCapacity & ~15) + 16;
+       }
+
+       int alignCapacity(int reqCapacity) {
+               int delta = reqCapacity & directMemoryCacheAlignmentMask;
+               return delta == 0 ? reqCapacity : reqCapacity + 
directMemoryCacheAlignment - delta;
+       }
+
+       void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean 
freeOldMemory) {
+               if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
+                       throw new IllegalArgumentException("newCapacity: " + 
newCapacity);
+               }
+
+               int oldCapacity = buf.length;
+               if (oldCapacity == newCapacity) {
+                       return;
+               }
+
+               PoolChunk<T> oldChunk = buf.chunk;
+               ByteBuffer oldNioBuffer = buf.tmpNioBuf;
+               long oldHandle = buf.handle;
+               T oldMemory = buf.memory;
+               int oldOffset = buf.offset;
+               int oldMaxLength = buf.maxLength;
+               int readerIndex = buf.readerIndex();
+               int writerIndex = buf.writerIndex();
+
+               allocate(parent.threadCache(), buf, newCapacity);
+               if (newCapacity > oldCapacity) {
+                       memoryCopy(
+                               oldMemory, oldOffset,
+                               buf.memory, buf.offset, oldCapacity);
+               } else if (newCapacity < oldCapacity) {
+                       if (readerIndex < newCapacity) {
+                               if (writerIndex > newCapacity) {
+                                       writerIndex = newCapacity;
+                               }
+                               memoryCopy(
+                                       oldMemory, oldOffset + readerIndex,
+                                       buf.memory, buf.offset + readerIndex, 
writerIndex - readerIndex);
+                       } else {
+                               readerIndex = writerIndex = newCapacity;
+                       }
+               }
+
+               buf.setIndex(readerIndex, writerIndex);
+
+               if (freeOldMemory) {
+                       free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, 
buf.cache);
+               }
+       }
+
+       @Override
+       public int numThreadCaches() {
+               return numThreadCaches.get();
+       }
+
+       @Override
+       public int numTinySubpages() {
+               return tinySubpagePools.length;
+       }
+
+       @Override
+       public int numSmallSubpages() {
+               return smallSubpagePools.length;
+       }
+
+       @Override
+       public int numChunkLists() {
+               return chunkListMetrics.size();
+       }
+
+       @Override
+       public List<PoolSubpageMetric> tinySubpages() {
+               return subPageMetricList(tinySubpagePools);
+       }
+
+       @Override
+       public List<PoolSubpageMetric> smallSubpages() {
+               return subPageMetricList(smallSubpagePools);
+       }
+
+       @Override
+       public List<PoolChunkListMetric> chunkLists() {
+               return chunkListMetrics;
+       }
+
+       private static List<PoolSubpageMetric> 
subPageMetricList(PoolSubpage<?>[] pages) {
+               List<PoolSubpageMetric> metrics = new 
ArrayList<PoolSubpageMetric>();
+               for (PoolSubpage<?> head : pages) {
+                       if (head.next == head) {
+                               continue;
+                       }
+                       PoolSubpage<?> s = head.next;
+                       for (;;) {
+                               metrics.add(s);
+                               s = s.next;
+                               if (s == head) {
+                                       break;
+                               }
+                       }
+               }
+               return metrics;
+       }
+
+       @Override
+       public long numAllocations() {
+               final long allocsNormal;
+               synchronized (this) {
+                       allocsNormal = allocationsNormal;
+               }
+               return allocationsTiny.value() + allocationsSmall.value() + 
allocsNormal + allocationsHuge.value();
+       }
+
+       @Override
+       public long numTinyAllocations() {
+               return allocationsTiny.value();
+       }
+
+       @Override
+       public long numSmallAllocations() {
+               return allocationsSmall.value();
+       }
+
+       @Override
+       public synchronized long numNormalAllocations() {
+               return allocationsNormal;
+       }
+
+       @Override
+       public long numDeallocations() {
+               final long deallocs;
+               synchronized (this) {
+                       deallocs = deallocationsTiny + deallocationsSmall + 
deallocationsNormal;
+               }
+               return deallocs + deallocationsHuge.value();
+       }
+
+       @Override
+       public synchronized long numTinyDeallocations() {
+               return deallocationsTiny;
+       }
+
+       @Override
+       public synchronized long numSmallDeallocations() {
+               return deallocationsSmall;
+       }
+
+       @Override
+       public synchronized long numNormalDeallocations() {
+               return deallocationsNormal;
+       }
+
+       @Override
+       public long numHugeAllocations() {
+               return allocationsHuge.value();
+       }
+
+       @Override
+       public long numHugeDeallocations() {
+               return deallocationsHuge.value();
+       }
+
+       @Override
+       public  long numActiveAllocations() {
+               long val = allocationsTiny.value() + allocationsSmall.value() + 
allocationsHuge.value()
+                       - deallocationsHuge.value();
+               synchronized (this) {
+                       val += allocationsNormal - (deallocationsTiny + 
deallocationsSmall + deallocationsNormal);
+               }
+               return max(val, 0);
+       }
+
+       @Override
+       public long numActiveTinyAllocations() {
+               return max(numTinyAllocations() - numTinyDeallocations(), 0);
+       }
+
+       @Override
+       public long numActiveSmallAllocations() {
+               return max(numSmallAllocations() - numSmallDeallocations(), 0);
+       }
+
+       @Override
+       public long numActiveNormalAllocations() {
+               final long val;
+               synchronized (this) {
+                       val = allocationsNormal - deallocationsNormal;
+               }
+               return max(val, 0);
+       }
+
+       @Override
+       public long numActiveHugeAllocations() {
+               return max(numHugeAllocations() - numHugeDeallocations(), 0);
+       }
+
+       @Override
+       public long numActiveBytes() {
+               long val = activeBytesHuge.value();
+               synchronized (this) {
+                       for (int i = 0; i < chunkListMetrics.size(); i++) {
+                               for (PoolChunkMetric m: 
chunkListMetrics.get(i)) {
+                                       val += m.chunkSize();
+                               }
+                       }
+               }
+               return max(0, val);
+       }
+
+       protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, 
int pageShifts, int chunkSize);
+       protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
+       protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
+       protected abstract void memoryCopy(T src, int srcOffset, T dst, int 
dstOffset, int length);
+       protected abstract void destroyChunk(PoolChunk<T> chunk);
+
+       @Override
+       public synchronized String toString() {
+               StringBuilder buf = new StringBuilder()
+                       .append("Chunk(s) at 0~25%:")
+                       .append(StringUtil.NEWLINE)
+                       .append(qInit)
+                       .append(StringUtil.NEWLINE)
+                       .append("Chunk(s) at 0~50%:")
+                       .append(StringUtil.NEWLINE)
+                       .append(q000)
+                       .append(StringUtil.NEWLINE)
+                       .append("Chunk(s) at 25~75%:")
+                       .append(StringUtil.NEWLINE)
+                       .append(q025)
+                       .append(StringUtil.NEWLINE)
+                       .append("Chunk(s) at 50~100%:")
+                       .append(StringUtil.NEWLINE)
+                       .append(q050)
+                       .append(StringUtil.NEWLINE)
+                       .append("Chunk(s) at 75~100%:")
+                       .append(StringUtil.NEWLINE)
+                       .append(q075)
+                       .append(StringUtil.NEWLINE)
+                       .append("Chunk(s) at 100%:")
+                       .append(StringUtil.NEWLINE)
+                       .append(q100)
+                       .append(StringUtil.NEWLINE)
+                       .append("tiny subpages:");
+               appendPoolSubPages(buf, tinySubpagePools);
+               buf.append(StringUtil.NEWLINE)
+                       .append("small subpages:");
+               appendPoolSubPages(buf, smallSubpagePools);
+               buf.append(StringUtil.NEWLINE);
+
+               return buf.toString();
+       }
+
+       private static void appendPoolSubPages(StringBuilder buf, 
PoolSubpage<?>[] subpages) {
+               for (int i = 0; i < subpages.length; i ++) {
+                       PoolSubpage<?> head = subpages[i];
+                       if (head.next == head) {
+                               continue;
+                       }
+
+                       buf.append(StringUtil.NEWLINE)
+                               .append(i)
+                               .append(": ");
+                       PoolSubpage<?> s = head.next;
+                       for (;;) {
+                               buf.append(s);
+                               s = s.next;
+                               if (s == head) {
+                                       break;
+                               }
+                       }
+               }
+       }
+
+       @Override
+       protected final void finalize() throws Throwable {
+               try {
+                       super.finalize();
+               } finally {
+                       destroyPoolSubPages(smallSubpagePools);
+                       destroyPoolSubPages(tinySubpagePools);
+                       destroyPoolChunkLists(qInit, q000, q025, q050, q075, 
q100);
+               }
+       }
+
+       private static void destroyPoolSubPages(PoolSubpage<?>[] pages) {
+               for (PoolSubpage<?> page : pages) {
+                       page.destroy();
+               }
+       }
+
+       private void destroyPoolChunkLists(PoolChunkList<T>... chunkLists) {
+               for (PoolChunkList<T> chunkList: chunkLists) {
+                       chunkList.destroy(this);
+               }
+       }
+
+       static final class HeapArena extends PoolArena<byte[]> {
+
+               HeapArena(PooledByteBufAllocator parent, int pageSize, int 
maxOrder,
+                                 int pageShifts, int chunkSize, int 
directMemoryCacheAlignment) {
+                       super(parent, pageSize, maxOrder, pageShifts, chunkSize,
+                               directMemoryCacheAlignment);
+               }
+
+               private static byte[] newByteArray(int size) {
+                       return 
PlatformDependent.allocateUninitializedArray(size);
+               }
+
+               @Override
+               boolean isDirect() {
+                       return false;
+               }
+
+               @Override
+               protected PoolChunk<byte[]> newChunk(int pageSize, int 
maxOrder, int pageShifts, int chunkSize) {
+                       return new PoolChunk<byte[]>(this, 
newByteArray(chunkSize), pageSize, maxOrder, pageShifts, chunkSize, 0);
+               }
+
+               @Override
+               protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
+                       return new PoolChunk<byte[]>(this, 
newByteArray(capacity), capacity, 0);
+               }
+
+               @Override
+               protected void destroyChunk(PoolChunk<byte[]> chunk) {
+                       // Rely on GC.
+               }
+
+               @Override
+               protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
+                       return HAS_UNSAFE ? 
PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
+                               : PooledHeapByteBuf.newInstance(maxCapacity);
+               }
+
+               @Override
+               protected void memoryCopy(byte[] src, int srcOffset, byte[] 
dst, int dstOffset, int length) {
+                       if (length == 0) {
+                               return;
+                       }
+
+                       System.arraycopy(src, srcOffset, dst, dstOffset, 
length);
+               }
+       }
+
+       static final class DirectArena extends PoolArena<ByteBuffer> {
+
+               DirectArena(PooledByteBufAllocator parent, int pageSize, int 
maxOrder,
+                                       int pageShifts, int chunkSize, int 
directMemoryCacheAlignment) {
+                       super(parent, pageSize, maxOrder, pageShifts, chunkSize,
+                               directMemoryCacheAlignment);
+               }
+
+               @Override
+               boolean isDirect() {
+                       return true;
+               }
+
+               // mark as package-private, only for unit test
+               int offsetCacheLine(ByteBuffer memory) {
+                       // We can only calculate the offset if Unsafe is 
present as otherwise directBufferAddress(...) will
+                       // throw an NPE.
+                       int remainder = HAS_UNSAFE
+                               ? (int) 
(PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask)
+                               : 0;
+
+                       // offset = alignment - address & (alignment - 1)
+                       return directMemoryCacheAlignment - remainder;
+               }
+
+               @Override
+               protected PoolChunk<ByteBuffer> newChunk(int pageSize, int 
maxOrder,
+                                                                               
                 int pageShifts, int chunkSize) {
+                       if (directMemoryCacheAlignment == 0) {
+                               return new PoolChunk<ByteBuffer>(this,
+                                       allocateDirect(chunkSize), pageSize, 
maxOrder,
+                                       pageShifts, chunkSize, 0);
+                       }
+                       final ByteBuffer memory = allocateDirect(chunkSize
+                               + directMemoryCacheAlignment);
+                       return new PoolChunk<ByteBuffer>(this, memory, pageSize,
+                               maxOrder, pageShifts, chunkSize,
+                               offsetCacheLine(memory));
+               }
+
+               @Override
+               protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
+                       if (directMemoryCacheAlignment == 0) {
+                               return new PoolChunk<ByteBuffer>(this,
+                                       allocateDirect(capacity), capacity, 0);
+                       }
+                       final ByteBuffer memory = allocateDirect(capacity
+                               + directMemoryCacheAlignment);
+                       return new PoolChunk<ByteBuffer>(this, memory, capacity,
+                               offsetCacheLine(memory));
+               }
+
+               private static ByteBuffer allocateDirect(int capacity) {
+                       return PlatformDependent.useDirectBufferNoCleaner() ?
+                               
PlatformDependent.allocateDirectNoCleaner(capacity) : 
ByteBuffer.allocateDirect(capacity);
+               }
+
+               @Override
+               protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
+                       if (PlatformDependent.useDirectBufferNoCleaner()) {
+                               
PlatformDependent.freeDirectNoCleaner(chunk.memory);
+                       } else {
+                               
PlatformDependent.freeDirectBuffer(chunk.memory);
+                       }
+               }
+
+               @Override
+               protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) 
{
+                       if (HAS_UNSAFE) {
+                               return 
PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
+                       } else {
+                               return 
PooledDirectByteBuf.newInstance(maxCapacity);
+                       }
+               }
+
+               @Override
+               protected void memoryCopy(ByteBuffer src, int srcOffset, 
ByteBuffer dst, int dstOffset, int length) {
+                       if (length == 0) {
+                               return;
+                       }
+
+                       if (HAS_UNSAFE) {
+                               PlatformDependent.copyMemory(
+                                       
PlatformDependent.directBufferAddress(src) + srcOffset,
+                                       
PlatformDependent.directBufferAddress(dst) + dstOffset, length);
+                       } else {
+                               // We must duplicate the NIO buffers because 
they may be accessed by other Netty buffers.
+                               src = src.duplicate();
+                               dst = dst.duplicate();
+                               src.position(srcOffset).limit(srcOffset + 
length);
+                               dst.position(dstOffset);
+                               dst.put(src);
+                       }
+               }
+       }
+}
diff --git 
a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java
 
b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java
new file mode 100644
index 0000000..37244e5
--- /dev/null
+++ 
b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PoolThreadCache.java
@@ -0,0 +1,508 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer;
+
+
+import static 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+
+import org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer.PoolArena.SizeClass;
+import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.Recycler;
+import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.Recycler.Handle;
+import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.MathUtil;
+import 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent;
+import 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLogger;
+import 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+// This class is copied from Netty's io.netty.buffer.PoolThreadCache,
+// can be removed after Beam bumps its shaded netty version to 1.22+ 
(BEAM-9030).
+//
+// Changed lines: 235, 242, 246~251, 268, 275, 280, 284, 426~427, 430, 435, 
453, 458, 463~467, 469
+
+/**
+ * Acts a Thread cache for allocations. This implementation is moduled after
+ * <a 
href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf";>jemalloc</a>
 and the descripted
+ * technics of
+ * <a 
href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919";>
+ * Scalable memory allocation using jemalloc</a>.
+ */
+final class PoolThreadCache {
+
+       private static final InternalLogger logger = 
InternalLoggerFactory.getInstance(PoolThreadCache.class);
+
+       final PoolArena<byte[]> heapArena;
+       final PoolArena<ByteBuffer> directArena;
+
+       // Hold the caches for the different size classes, which are tiny, 
small and normal.
+       private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
+       private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
+       private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
+       private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
+       private final MemoryRegionCache<byte[]>[] normalHeapCaches;
+       private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
+
+       // Used for bitshifting when calculate the index of normal caches later
+       private final int numShiftsNormalDirect;
+       private final int numShiftsNormalHeap;
+       private final int freeSweepAllocationThreshold;
+       private final AtomicBoolean freed = new AtomicBoolean();
+
+       private int allocations;
+
+       // TODO: Test if adding padding helps under contention
+       //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
+
+       PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> 
directArena,
+                                       int tinyCacheSize, int smallCacheSize, 
int normalCacheSize,
+                                       int maxCachedBufferCapacity, int 
freeSweepAllocationThreshold) {
+               checkPositiveOrZero(maxCachedBufferCapacity, 
"maxCachedBufferCapacity");
+               this.freeSweepAllocationThreshold = 
freeSweepAllocationThreshold;
+               this.heapArena = heapArena;
+               this.directArena = directArena;
+               if (directArena != null) {
+                       tinySubPageDirectCaches = createSubPageCaches(
+                               tinyCacheSize, PoolArena.numTinySubpagePools, 
SizeClass.Tiny);
+                       smallSubPageDirectCaches = createSubPageCaches(
+                               smallCacheSize, 
directArena.numSmallSubpagePools, SizeClass.Small);
+
+                       numShiftsNormalDirect = log2(directArena.pageSize);
+                       normalDirectCaches = createNormalCaches(
+                               normalCacheSize, maxCachedBufferCapacity, 
directArena);
+
+                       directArena.numThreadCaches.getAndIncrement();
+               } else {
+                       // No directArea is configured so just null out all 
caches
+                       tinySubPageDirectCaches = null;
+                       smallSubPageDirectCaches = null;
+                       normalDirectCaches = null;
+                       numShiftsNormalDirect = -1;
+               }
+               if (heapArena != null) {
+                       // Create the caches for the heap allocations
+                       tinySubPageHeapCaches = createSubPageCaches(
+                               tinyCacheSize, PoolArena.numTinySubpagePools, 
SizeClass.Tiny);
+                       smallSubPageHeapCaches = createSubPageCaches(
+                               smallCacheSize, heapArena.numSmallSubpagePools, 
SizeClass.Small);
+
+                       numShiftsNormalHeap = log2(heapArena.pageSize);
+                       normalHeapCaches = createNormalCaches(
+                               normalCacheSize, maxCachedBufferCapacity, 
heapArena);
+
+                       heapArena.numThreadCaches.getAndIncrement();
+               } else {
+                       // No heapArea is configured so just null out all caches
+                       tinySubPageHeapCaches = null;
+                       smallSubPageHeapCaches = null;
+                       normalHeapCaches = null;
+                       numShiftsNormalHeap = -1;
+               }
+
+               // Only check if there are caches in use.
+               if ((tinySubPageDirectCaches != null || 
smallSubPageDirectCaches != null || normalDirectCaches != null
+                       || tinySubPageHeapCaches != null || 
smallSubPageHeapCaches != null || normalHeapCaches != null)
+                       && freeSweepAllocationThreshold < 1) {
+                       throw new 
IllegalArgumentException("freeSweepAllocationThreshold: "
+                               + freeSweepAllocationThreshold + " (expected: > 
0)");
+               }
+       }
+
+       private static <T> MemoryRegionCache<T>[] createSubPageCaches(
+               int cacheSize, int numCaches, SizeClass sizeClass) {
+               if (cacheSize > 0 && numCaches > 0) {
+                       @SuppressWarnings("unchecked")
+                       MemoryRegionCache<T>[] cache = new 
MemoryRegionCache[numCaches];
+                       for (int i = 0; i < cache.length; i++) {
+                               // TODO: maybe use cacheSize / cache.length
+                               cache[i] = new 
SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
+                       }
+                       return cache;
+               } else {
+                       return null;
+               }
+       }
+
+       private static <T> MemoryRegionCache<T>[] createNormalCaches(
+               int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
+               if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
+                       int max = Math.min(area.chunkSize, 
maxCachedBufferCapacity);
+                       int arraySize = Math.max(1, log2(max / area.pageSize) + 
1);
+
+                       @SuppressWarnings("unchecked")
+                       MemoryRegionCache<T>[] cache = new 
MemoryRegionCache[arraySize];
+                       for (int i = 0; i < cache.length; i++) {
+                               cache[i] = new 
NormalMemoryRegionCache<T>(cacheSize);
+                       }
+                       return cache;
+               } else {
+                       return null;
+               }
+       }
+
+       private static int log2(int val) {
+               int res = 0;
+               while (val > 1) {
+                       val >>= 1;
+                       res++;
+               }
+               return res;
+       }
+
+       /**
+        * Try to allocate a tiny buffer out of the cache. Returns {@code true} 
if successful {@code false} otherwise
+        */
+       boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int 
reqCapacity, int normCapacity) {
+               return allocate(cacheForTiny(area, normCapacity), buf, 
reqCapacity);
+       }
+
+       /**
+        * Try to allocate a small buffer out of the cache. Returns {@code 
true} if successful {@code false} otherwise
+        */
+       boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int 
reqCapacity, int normCapacity) {
+               return allocate(cacheForSmall(area, normCapacity), buf, 
reqCapacity);
+       }
+
+       /**
+        * Try to allocate a small buffer out of the cache. Returns {@code 
true} if successful {@code false} otherwise
+        */
+       boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int 
reqCapacity, int normCapacity) {
+               return allocate(cacheForNormal(area, normCapacity), buf, 
reqCapacity);
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, 
int reqCapacity) {
+               if (cache == null) {
+                       // no cache found so just return false here
+                       return false;
+               }
+               boolean allocated = cache.allocate(buf, reqCapacity);
+               if (++ allocations >= freeSweepAllocationThreshold) {
+                       allocations = 0;
+                       trim();
+               }
+               return allocated;
+       }
+
+       /**
+        * Add {@link PoolChunk} and {@code handle} to the cache if there is 
enough room.
+        * Returns {@code true} if it fit into the cache {@code false} 
otherwise.
+        */
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
+                               long handle, int normCapacity, SizeClass 
sizeClass) {
+               MemoryRegionCache<?> cache = cache(area, normCapacity, 
sizeClass);
+               if (cache == null) {
+                       return false;
+               }
+               return cache.add(chunk, nioBuffer, handle);
+       }
+
+       private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, 
SizeClass sizeClass) {
+               switch (sizeClass) {
+                       case Normal:
+                               return cacheForNormal(area, normCapacity);
+                       case Small:
+                               return cacheForSmall(area, normCapacity);
+                       case Tiny:
+                               return cacheForTiny(area, normCapacity);
+                       default:
+                               throw new Error();
+               }
+       }
+
+       /// TODO: In the future when we move to Java9+ we should use 
java.lang.ref.Cleaner.
+       @Override
+       protected void finalize() throws Throwable {
+               try {
+                       super.finalize();
+               } finally {
+                       free(true);
+               }
+       }
+
+       /**
+        *  Should be called if the Thread that uses this cache is about to 
exist to release resources out of the cache
+        */
+       void free(boolean finalizer) {
+               // As free() may be called either by the finalizer or by 
FastThreadLocal.onRemoval(...) we need to ensure
+               // we only call this one time.
+               if (freed.compareAndSet(false, true)) {
+                       int numFreed = free(tinySubPageDirectCaches, finalizer) 
+
+                               free(smallSubPageDirectCaches, finalizer) +
+                               free(normalDirectCaches, finalizer) +
+                               free(tinySubPageHeapCaches, finalizer) +
+                               free(smallSubPageHeapCaches, finalizer) +
+                               free(normalHeapCaches, finalizer);
+
+                       if (numFreed > 0 && logger.isDebugEnabled()) {
+                               logger.debug("Freed {} thread-local buffer(s) 
from thread: {}", numFreed,
+                                       Thread.currentThread().getName());
+                       }
+
+                       if (directArena != null) {
+                               directArena.numThreadCaches.getAndDecrement();
+                       }
+
+                       if (heapArena != null) {
+                               heapArena.numThreadCaches.getAndDecrement();
+                       }
+               }
+       }
+
+       private static int free(MemoryRegionCache<?>[] caches, boolean 
finalizer) {
+               if (caches == null) {
+                       return 0;
+               }
+
+               int numFreed = 0;
+               for (MemoryRegionCache<?> c: caches) {
+                       numFreed += free(c, finalizer);
+               }
+               return numFreed;
+       }
+
+       private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
+               if (cache == null) {
+                       return 0;
+               }
+               return cache.free(finalizer);
+       }
+
+       void trim() {
+               trim(tinySubPageDirectCaches);
+               trim(smallSubPageDirectCaches);
+               trim(normalDirectCaches);
+               trim(tinySubPageHeapCaches);
+               trim(smallSubPageHeapCaches);
+               trim(normalHeapCaches);
+       }
+
+       private static void trim(MemoryRegionCache<?>[] caches) {
+               if (caches == null) {
+                       return;
+               }
+               for (MemoryRegionCache<?> c: caches) {
+                       trim(c);
+               }
+       }
+
+       private static void trim(MemoryRegionCache<?> cache) {
+               if (cache == null) {
+                       return;
+               }
+               cache.trim();
+       }
+
+       private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int 
normCapacity) {
+               int idx = PoolArena.tinyIdx(normCapacity);
+               if (area.isDirect()) {
+                       return cache(tinySubPageDirectCaches, idx);
+               }
+               return cache(tinySubPageHeapCaches, idx);
+       }
+
+       private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int 
normCapacity) {
+               int idx = PoolArena.smallIdx(normCapacity);
+               if (area.isDirect()) {
+                       return cache(smallSubPageDirectCaches, idx);
+               }
+               return cache(smallSubPageHeapCaches, idx);
+       }
+
+       private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int 
normCapacity) {
+               if (area.isDirect()) {
+                       int idx = log2(normCapacity >> numShiftsNormalDirect);
+                       return cache(normalDirectCaches, idx);
+               }
+               int idx = log2(normCapacity >> numShiftsNormalHeap);
+               return cache(normalHeapCaches, idx);
+       }
+
+       private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] 
cache, int idx) {
+               if (cache == null || idx > cache.length - 1) {
+                       return null;
+               }
+               return cache[idx];
+       }
+
+       /**
+        * Cache used for buffers which are backed by TINY or SMALL size.
+        */
+       private static final class SubPageMemoryRegionCache<T> extends 
MemoryRegionCache<T> {
+               SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
+                       super(size, sizeClass);
+               }
+
+               @Override
+               protected void initBuf(
+                       PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, 
PooledByteBuf<T> buf, int reqCapacity) {
+                       chunk.initBufWithSubpage(buf, nioBuffer, handle, 
reqCapacity);
+               }
+       }
+
+       /**
+        * Cache used for buffers which are backed by NORMAL size.
+        */
+       private static final class NormalMemoryRegionCache<T> extends 
MemoryRegionCache<T> {
+               NormalMemoryRegionCache(int size) {
+                       super(size, SizeClass.Normal);
+               }
+
+               @Override
+               protected void initBuf(
+                       PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, 
PooledByteBuf<T> buf, int reqCapacity) {
+                       chunk.initBuf(buf, nioBuffer, handle, reqCapacity);
+               }
+       }
+
+       private abstract static class MemoryRegionCache<T> {
+               private final int size;
+               private final Queue<Entry<T>> queue;
+               private final SizeClass sizeClass;
+               private int allocations;
+
+               MemoryRegionCache(int size, SizeClass sizeClass) {
+                       this.size = 
MathUtil.safeFindNextPositivePowerOfTwo(size);
+                       queue = PlatformDependent.newFixedMpscQueue(this.size);
+                       this.sizeClass = sizeClass;
+               }
+
+               /**
+                * Init the {@link PooledByteBuf} using the provided chunk and 
handle with the capacity restrictions.
+                */
+               protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer 
nioBuffer, long handle,
+                                                                               
PooledByteBuf<T> buf, int reqCapacity);
+
+               /**
+                * Add to cache if not already full.
+                */
+               @SuppressWarnings("unchecked")
+               public final boolean add(PoolChunk<T> chunk, ByteBuffer 
nioBuffer, long handle) {
+                       Entry<T> entry = newEntry(chunk, nioBuffer, handle);
+                       boolean queued = queue.offer(entry);
+                       if (!queued) {
+                               // If it was not possible to cache the chunk, 
immediately recycle the entry
+                               entry.recycle();
+                       }
+
+                       return queued;
+               }
+
+               /**
+                * Allocate something out of the cache if possible and remove 
the entry from the cache.
+                */
+               public final boolean allocate(PooledByteBuf<T> buf, int 
reqCapacity) {
+                       Entry<T> entry = queue.poll();
+                       if (entry == null) {
+                               return false;
+                       }
+                       initBuf(entry.chunk, entry.nioBuffer, entry.handle, 
buf, reqCapacity);
+                       entry.recycle();
+
+                       // allocations is not thread-safe which is fine as this 
is only called from the same thread all time.
+                       ++ allocations;
+                       return true;
+               }
+
+               /**
+                * Clear out this cache and free up all previous cached {@link 
PoolChunk}s and {@code handle}s.
+                */
+               public final int free(boolean finalizer) {
+                       return free(Integer.MAX_VALUE, finalizer);
+               }
+
+               private int free(int max, boolean finalizer) {
+                       int numFreed = 0;
+                       for (; numFreed < max; numFreed++) {
+                               Entry<T> entry = queue.poll();
+                               if (entry != null) {
+                                       freeEntry(entry, finalizer);
+                               } else {
+                                       // all cleared
+                                       return numFreed;
+                               }
+                       }
+                       return numFreed;
+               }
+
+               /**
+                * Free up cached {@link PoolChunk}s if not allocated 
frequently enough.
+                */
+               public final void trim() {
+                       int free = size - allocations;
+                       allocations = 0;
+
+                       // We not even allocated all the number that are
+                       if (free > 0) {
+                               free(free, false);
+                       }
+               }
+
+               @SuppressWarnings({ "unchecked", "rawtypes" })
+               private  void freeEntry(Entry entry, boolean finalizer) {
+                       PoolChunk chunk = entry.chunk;
+                       long handle = entry.handle;
+                       ByteBuffer nioBuffer = entry.nioBuffer;
+
+                       if (!finalizer) {
+                               // recycle now so PoolChunk can be GC'ed. This 
will only be done if this is not freed because of
+                               // a finalizer.
+                               entry.recycle();
+                       }
+
+                       chunk.arena.freeChunk(chunk, handle, sizeClass, 
nioBuffer, finalizer);
+               }
+
+               static final class Entry<T> {
+                       final Handle<Entry<?>> recyclerHandle;
+                       PoolChunk<T> chunk;
+                       ByteBuffer nioBuffer;
+                       long handle = -1;
+
+                       Entry(Handle<Entry<?>> recyclerHandle) {
+                               this.recyclerHandle = recyclerHandle;
+                       }
+
+                       void recycle() {
+                               chunk = null;
+                               nioBuffer = null;
+                               handle = -1;
+                               recyclerHandle.recycle(this);
+                       }
+               }
+
+               @SuppressWarnings("rawtypes")
+               private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer 
nioBuffer, long handle) {
+                       Entry entry = RECYCLER.get();
+                       entry.chunk = chunk;
+                       entry.nioBuffer = nioBuffer;
+                       entry.handle = handle;
+                       return entry;
+               }
+
+               @SuppressWarnings("rawtypes")
+               private static final Recycler<Entry> RECYCLER = new 
Recycler<Entry>() {
+                       @SuppressWarnings("unchecked")
+                       @Override
+                       protected Entry newObject(Handle<Entry> handle) {
+                               return new Entry(handle);
+                       }
+               };
+       }
+}
diff --git 
a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java
 
b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java
new file mode 100644
index 0000000..2124bdc
--- /dev/null
+++ 
b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p21p0/io/netty/buffer/PooledByteBufAllocator.java
@@ -0,0 +1,640 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.beam.vendor.grpc.v1p21p0.io.netty.buffer;
+
+import static 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+
+import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.NettyRuntime;
+import 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocal;
+import 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.concurrent.FastThreadLocalThread;
+import 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.PlatformDependent;
+import org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.StringUtil;
+import 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.SystemPropertyUtil;
+import 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLogger;
+import 
org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+// This class is copied from Netty's io.netty.buffer.PooledByteBufAllocator,
+// can be removed after Beam bumps its shaded netty version to 1.22+ 
(BEAM-9030).
+//
+// Changed lines: 458
+
+public class PooledByteBufAllocator extends AbstractByteBufAllocator 
implements ByteBufAllocatorMetricProvider {
+
+       private static final InternalLogger logger = 
InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
+       private static final int DEFAULT_NUM_HEAP_ARENA;
+       private static final int DEFAULT_NUM_DIRECT_ARENA;
+
+       private static final int DEFAULT_PAGE_SIZE;
+       private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per 
chunk
+       private static final int DEFAULT_TINY_CACHE_SIZE;
+       private static final int DEFAULT_SMALL_CACHE_SIZE;
+       private static final int DEFAULT_NORMAL_CACHE_SIZE;
+       private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
+       private static final int DEFAULT_CACHE_TRIM_INTERVAL;
+       private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
+       private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
+       static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;
+
+       private static final int MIN_PAGE_SIZE = 4096;
+       private static final int MAX_CHUNK_SIZE = (int) (((long) 
Integer.MAX_VALUE + 1) / 2);
+
+       static {
+               int defaultPageSize = 
SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
+               Throwable pageSizeFallbackCause = null;
+               try {
+                       validateAndCalculatePageShifts(defaultPageSize);
+               } catch (Throwable t) {
+                       pageSizeFallbackCause = t;
+                       defaultPageSize = 8192;
+               }
+               DEFAULT_PAGE_SIZE = defaultPageSize;
+
+               int defaultMaxOrder = 
SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
+               Throwable maxOrderFallbackCause = null;
+               try {
+                       validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, 
defaultMaxOrder);
+               } catch (Throwable t) {
+                       maxOrderFallbackCause = t;
+                       defaultMaxOrder = 11;
+               }
+               DEFAULT_MAX_ORDER = defaultMaxOrder;
+
+               // Determine reasonable default for nHeapArena and nDirectArena.
+               // Assuming each arena has 3 chunks, the pool should not 
consume more than 50% of max memory.
+               final Runtime runtime = Runtime.getRuntime();
+
+               /*
+                * We use 2 * available processors by default to reduce 
contention as we use 2 * available processors for the
+                * number of EventLoops in NIO and EPOLL as well. If we choose 
a smaller number we will run into hot spots as
+                * allocation and de-allocation needs to be synchronized on the 
PoolArena.
+                *
+                * See https://github.com/netty/netty/issues/3888.
+                */
+               final int defaultMinNumArena = 
NettyRuntime.availableProcessors() * 2;
+               final int defaultChunkSize = DEFAULT_PAGE_SIZE << 
DEFAULT_MAX_ORDER;
+               DEFAULT_NUM_HEAP_ARENA = Math.max(0,
+                       SystemPropertyUtil.getInt(
+                               "io.netty.allocator.numHeapArenas",
+                               (int) Math.min(
+                                       defaultMinNumArena,
+                                       runtime.maxMemory() / defaultChunkSize 
/ 2 / 3)));
+               DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
+                       SystemPropertyUtil.getInt(
+                               "io.netty.allocator.numDirectArenas",
+                               (int) Math.min(
+                                       defaultMinNumArena,
+                                       PlatformDependent.maxDirectMemory() / 
defaultChunkSize / 2 / 3)));
+
+               // cache sizes
+               DEFAULT_TINY_CACHE_SIZE = 
SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
+               DEFAULT_SMALL_CACHE_SIZE = 
SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
+               DEFAULT_NORMAL_CACHE_SIZE = 
SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
+
+               // 32 kb is the default maximum capacity of the cached buffer. 
Similar to what is explained in
+               // 'Scalable memory allocation using jemalloc'
+               DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
+                       "io.netty.allocator.maxCachedBufferCapacity", 32 * 
1024);
+
+               // the number of threshold of allocations when cached entries 
will be freed up if not frequently used
+               DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
+                       "io.netty.allocator.cacheTrimInterval", 8192);
+
+               DEFAULT_USE_CACHE_FOR_ALL_THREADS = 
SystemPropertyUtil.getBoolean(
+                       "io.netty.allocator.useCacheForAllThreads", true);
+
+               DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = 
SystemPropertyUtil.getInt(
+                       "io.netty.allocator.directMemoryCacheAlignment", 0);
+
+               // Use 1023 by default as we use an ArrayDeque as backing 
storage which will then allocate an internal array
+               // of 1024 elements. Otherwise we would allocate 2048 and only 
use 1024 which is wasteful.
+               DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK = 
SystemPropertyUtil.getInt(
+                       "io.netty.allocator.maxCachedByteBuffersPerChunk", 
1023);
+
+               if (logger.isDebugEnabled()) {
+                       logger.debug("-Dio.netty.allocator.numHeapArenas: {}", 
DEFAULT_NUM_HEAP_ARENA);
+                       logger.debug("-Dio.netty.allocator.numDirectArenas: 
{}", DEFAULT_NUM_DIRECT_ARENA);
+                       if (pageSizeFallbackCause == null) {
+                               logger.debug("-Dio.netty.allocator.pageSize: 
{}", DEFAULT_PAGE_SIZE);
+                       } else {
+                               logger.debug("-Dio.netty.allocator.pageSize: 
{}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
+                       }
+                       if (maxOrderFallbackCause == null) {
+                               logger.debug("-Dio.netty.allocator.maxOrder: 
{}", DEFAULT_MAX_ORDER);
+                       } else {
+                               logger.debug("-Dio.netty.allocator.maxOrder: 
{}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
+                       }
+                       logger.debug("-Dio.netty.allocator.chunkSize: {}", 
DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
+                       logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", 
DEFAULT_TINY_CACHE_SIZE);
+                       logger.debug("-Dio.netty.allocator.smallCacheSize: {}", 
DEFAULT_SMALL_CACHE_SIZE);
+                       logger.debug("-Dio.netty.allocator.normalCacheSize: 
{}", DEFAULT_NORMAL_CACHE_SIZE);
+                       
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", 
DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
+                       logger.debug("-Dio.netty.allocator.cacheTrimInterval: 
{}", DEFAULT_CACHE_TRIM_INTERVAL);
+                       
logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", 
DEFAULT_USE_CACHE_FOR_ALL_THREADS);
+                       
logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}",
+                               DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK);
+               }
+       }
+
+       public static final PooledByteBufAllocator DEFAULT =
+               new 
PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
+
+       private final PoolArena<byte[]>[] heapArenas;
+       private final PoolArena<ByteBuffer>[] directArenas;
+       private final int tinyCacheSize;
+       private final int smallCacheSize;
+       private final int normalCacheSize;
+       private final List<PoolArenaMetric> heapArenaMetrics;
+       private final List<PoolArenaMetric> directArenaMetrics;
+       private final PoolThreadLocalCache threadCache;
+       private final int chunkSize;
+       private final PooledByteBufAllocatorMetric metric;
+
+       public PooledByteBufAllocator() {
+               this(false);
+       }
+
+       @SuppressWarnings("deprecation")
+       public PooledByteBufAllocator(boolean preferDirect) {
+               this(preferDirect, DEFAULT_NUM_HEAP_ARENA, 
DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
+       }
+
+       @SuppressWarnings("deprecation")
+       public PooledByteBufAllocator(int nHeapArena, int nDirectArena, int 
pageSize, int maxOrder) {
+               this(false, nHeapArena, nDirectArena, pageSize, maxOrder);
+       }
+
+       /**
+        * @deprecated use
+        * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, 
int, int, int, int, int, int, boolean)}
+        */
+       @Deprecated
+       public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int 
nDirectArena, int pageSize, int maxOrder) {
+               this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
+                       DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, 
DEFAULT_NORMAL_CACHE_SIZE);
+       }
+
+       /**
+        * @deprecated use
+        * {@link PooledByteBufAllocator#PooledByteBufAllocator(boolean, int, 
int, int, int, int, int, int, boolean)}
+        */
+       @Deprecated
+       public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int 
nDirectArena, int pageSize, int maxOrder,
+                                                                 int 
tinyCacheSize, int smallCacheSize, int normalCacheSize) {
+               this(preferDirect, nHeapArena, nDirectArena, pageSize, 
maxOrder, tinyCacheSize, smallCacheSize,
+                       normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS, 
DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
+       }
+
+       public PooledByteBufAllocator(boolean preferDirect, int nHeapArena,
+                                                                 int 
nDirectArena, int pageSize, int maxOrder, int tinyCacheSize,
+                                                                 int 
smallCacheSize, int normalCacheSize,
+                                                                 boolean 
useCacheForAllThreads) {
+               this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
+                       tinyCacheSize, smallCacheSize, normalCacheSize,
+                       useCacheForAllThreads, 
DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
+       }
+
+       public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int 
nDirectArena, int pageSize, int maxOrder,
+                                                                 int 
tinyCacheSize, int smallCacheSize, int normalCacheSize,
+                                                                 boolean 
useCacheForAllThreads, int directMemoryCacheAlignment) {
+               super(preferDirect);
+               threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
+               this.tinyCacheSize = tinyCacheSize;
+               this.smallCacheSize = smallCacheSize;
+               this.normalCacheSize = normalCacheSize;
+               chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
+
+               checkPositiveOrZero(nHeapArena, "nHeapArena");
+               checkPositiveOrZero(nDirectArena, "nDirectArena");
+
+               checkPositiveOrZero(directMemoryCacheAlignment, 
"directMemoryCacheAlignment");
+               if (directMemoryCacheAlignment > 0 && 
!isDirectMemoryCacheAlignmentSupported()) {
+                       throw new 
IllegalArgumentException("directMemoryCacheAlignment is not supported");
+               }
+
+               if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) 
!= directMemoryCacheAlignment) {
+                       throw new 
IllegalArgumentException("directMemoryCacheAlignment: "
+                               + directMemoryCacheAlignment + " (expected: 
power of two)");
+               }
+
+               int pageShifts = validateAndCalculatePageShifts(pageSize);
+
+               if (nHeapArena > 0) {
+                       heapArenas = newArenaArray(nHeapArena);
+                       List<PoolArenaMetric> metrics = new 
ArrayList<PoolArenaMetric>(heapArenas.length);
+                       for (int i = 0; i < heapArenas.length; i ++) {
+                               PoolArena.HeapArena arena = new 
PoolArena.HeapArena(this,
+                                       pageSize, maxOrder, pageShifts, 
chunkSize,
+                                       directMemoryCacheAlignment);
+                               heapArenas[i] = arena;
+                               metrics.add(arena);
+                       }
+                       heapArenaMetrics = 
Collections.unmodifiableList(metrics);
+               } else {
+                       heapArenas = null;
+                       heapArenaMetrics = Collections.emptyList();
+               }
+
+               if (nDirectArena > 0) {
+                       directArenas = newArenaArray(nDirectArena);
+                       List<PoolArenaMetric> metrics = new 
ArrayList<PoolArenaMetric>(directArenas.length);
+                       for (int i = 0; i < directArenas.length; i ++) {
+                               PoolArena.DirectArena arena = new 
PoolArena.DirectArena(
+                                       this, pageSize, maxOrder, pageShifts, 
chunkSize, directMemoryCacheAlignment);
+                               directArenas[i] = arena;
+                               metrics.add(arena);
+                       }
+                       directArenaMetrics = 
Collections.unmodifiableList(metrics);
+               } else {
+                       directArenas = null;
+                       directArenaMetrics = Collections.emptyList();
+               }
+               metric = new PooledByteBufAllocatorMetric(this);
+       }
+
+       @SuppressWarnings("unchecked")
+       private static <T> PoolArena<T>[] newArenaArray(int size) {
+               return new PoolArena[size];
+       }
+
+       private static int validateAndCalculatePageShifts(int pageSize) {
+               if (pageSize < MIN_PAGE_SIZE) {
+                       throw new IllegalArgumentException("pageSize: " + 
pageSize + " (expected: " + MIN_PAGE_SIZE + ")");
+               }
+
+               if ((pageSize & pageSize - 1) != 0) {
+                       throw new IllegalArgumentException("pageSize: " + 
pageSize + " (expected: power of 2)");
+               }
+
+               // Logarithm base 2. At this point we know that pageSize is a 
power of two.
+               return Integer.SIZE - 1 - 
Integer.numberOfLeadingZeros(pageSize);
+       }
+
+       private static int validateAndCalculateChunkSize(int pageSize, int 
maxOrder) {
+               if (maxOrder > 14) {
+                       throw new IllegalArgumentException("maxOrder: " + 
maxOrder + " (expected: 0-14)");
+               }
+
+               // Ensure the resulting chunkSize does not overflow.
+               int chunkSize = pageSize;
+               for (int i = maxOrder; i > 0; i --) {
+                       if (chunkSize > MAX_CHUNK_SIZE / 2) {
+                               throw new 
IllegalArgumentException(String.format(
+                                       "pageSize (%d) << maxOrder (%d) must 
not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE));
+                       }
+                       chunkSize <<= 1;
+               }
+               return chunkSize;
+       }
+
+       @Override
+       protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+               PoolThreadCache cache = threadCache.get();
+               PoolArena<byte[]> heapArena = cache.heapArena;
+
+               final ByteBuf buf;
+               if (heapArena != null) {
+                       buf = heapArena.allocate(cache, initialCapacity, 
maxCapacity);
+               } else {
+                       buf = PlatformDependent.hasUnsafe() ?
+                               new UnpooledUnsafeHeapByteBuf(this, 
initialCapacity, maxCapacity) :
+                               new UnpooledHeapByteBuf(this, initialCapacity, 
maxCapacity);
+               }
+
+               return toLeakAwareBuffer(buf);
+       }
+
+       @Override
+       protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) 
{
+               PoolThreadCache cache = threadCache.get();
+               PoolArena<ByteBuffer> directArena = cache.directArena;
+
+               final ByteBuf buf;
+               if (directArena != null) {
+                       buf = directArena.allocate(cache, initialCapacity, 
maxCapacity);
+               } else {
+                       buf = PlatformDependent.hasUnsafe() ?
+                               UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, 
initialCapacity, maxCapacity) :
+                               new UnpooledDirectByteBuf(this, 
initialCapacity, maxCapacity);
+               }
+
+               return toLeakAwareBuffer(buf);
+       }
+
+       /**
+        * Default number of heap arenas - System Property: 
io.netty.allocator.numHeapArenas - default 2 * cores
+        */
+       public static int defaultNumHeapArena() {
+               return DEFAULT_NUM_HEAP_ARENA;
+       }
+
+       /**
+        * Default number of direct arenas - System Property: 
io.netty.allocator.numDirectArenas - default 2 * cores
+        */
+       public static int defaultNumDirectArena() {
+               return DEFAULT_NUM_DIRECT_ARENA;
+       }
+
+       /**
+        * Default buffer page size - System Property: 
io.netty.allocator.pageSize - default 8192
+        */
+       public static int defaultPageSize() {
+               return DEFAULT_PAGE_SIZE;
+       }
+
+       /**
+        * Default maximum order - System Property: io.netty.allocator.maxOrder 
- default 11
+        */
+       public static int defaultMaxOrder() {
+               return DEFAULT_MAX_ORDER;
+       }
+
+       /**
+        * Default thread caching behavior - System Property: 
io.netty.allocator.useCacheForAllThreads - default true
+        */
+       public static boolean defaultUseCacheForAllThreads() {
+               return DEFAULT_USE_CACHE_FOR_ALL_THREADS;
+       }
+
+       /**
+        * Default prefer direct - System Property: io.netty.noPreferDirect - 
default false
+        */
+       public static boolean defaultPreferDirect() {
+               return PlatformDependent.directBufferPreferred();
+       }
+
+       /**
+        * Default tiny cache size - System Property: 
io.netty.allocator.tinyCacheSize - default 512
+        */
+       public static int defaultTinyCacheSize() {
+               return DEFAULT_TINY_CACHE_SIZE;
+       }
+
+       /**
+        * Default small cache size - System Property: 
io.netty.allocator.smallCacheSize - default 256
+        */
+       public static int defaultSmallCacheSize() {
+               return DEFAULT_SMALL_CACHE_SIZE;
+       }
+
+       /**
+        * Default normal cache size - System Property: 
io.netty.allocator.normalCacheSize - default 64
+        */
+       public static int defaultNormalCacheSize() {
+               return DEFAULT_NORMAL_CACHE_SIZE;
+       }
+
+       /**
+        * Return {@code true} if direct memory cache alignment is supported, 
{@code false} otherwise.
+        */
+       public static boolean isDirectMemoryCacheAlignmentSupported() {
+               return PlatformDependent.hasUnsafe();
+       }
+
+       @Override
+       public boolean isDirectBufferPooled() {
+               return directArenas != null;
+       }
+
+       /**
+        * Returns {@code true} if the calling {@link Thread} has a {@link 
ThreadLocal} cache for the allocated
+        * buffers.
+        */
+       @Deprecated
+       public boolean hasThreadLocalCache() {
+               return threadCache.isSet();
+       }
+
+       /**
+        * Free all cached buffers for the calling {@link Thread}.
+        */
+       @Deprecated
+       public void freeThreadLocalCache() {
+               threadCache.remove();
+       }
+
+       final class PoolThreadLocalCache extends 
FastThreadLocal<PoolThreadCache> {
+               private final boolean useCacheForAllThreads;
+
+               PoolThreadLocalCache(boolean useCacheForAllThreads) {
+                       this.useCacheForAllThreads = useCacheForAllThreads;
+               }
+
+               @Override
+               protected synchronized PoolThreadCache initialValue() {
+                       final PoolArena<byte[]> heapArena = 
leastUsedArena(heapArenas);
+                       final PoolArena<ByteBuffer> directArena = 
leastUsedArena(directArenas);
+
+                       Thread current = Thread.currentThread();
+                       if (useCacheForAllThreads || current instanceof 
FastThreadLocalThread) {
+                               return new PoolThreadCache(
+                                       heapArena, directArena, tinyCacheSize, 
smallCacheSize, normalCacheSize,
+                                       DEFAULT_MAX_CACHED_BUFFER_CAPACITY, 
DEFAULT_CACHE_TRIM_INTERVAL);
+                       }
+                       // No caching so just use 0 as sizes.
+                       return new PoolThreadCache(heapArena, directArena, 0, 
0, 0, 0, 0);
+               }
+
+               @Override
+               protected void onRemoval(PoolThreadCache threadCache) {
+                       threadCache.free(false);
+               }
+
+               private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
+                       if (arenas == null || arenas.length == 0) {
+                               return null;
+                       }
+
+                       PoolArena<T> minArena = arenas[0];
+                       for (int i = 1; i < arenas.length; i++) {
+                               PoolArena<T> arena = arenas[i];
+                               if (arena.numThreadCaches.get() < 
minArena.numThreadCaches.get()) {
+                                       minArena = arena;
+                               }
+                       }
+
+                       return minArena;
+               }
+       }
+
+       @Override
+       public PooledByteBufAllocatorMetric metric() {
+               return metric;
+       }
+
+       /**
+        * Return the number of heap arenas.
+        *
+        * @deprecated use {@link PooledByteBufAllocatorMetric#numHeapArenas()}.
+        */
+       @Deprecated
+       public int numHeapArenas() {
+               return heapArenaMetrics.size();
+       }
+
+       /**
+        * Return the number of direct arenas.
+        *
+        * @deprecated use {@link 
PooledByteBufAllocatorMetric#numDirectArenas()}.
+        */
+       @Deprecated
+       public int numDirectArenas() {
+               return directArenaMetrics.size();
+       }
+
+       /**
+        * Return a {@link List} of all heap {@link PoolArenaMetric}s that are 
provided by this pool.
+        *
+        * @deprecated use {@link PooledByteBufAllocatorMetric#heapArenas()}.
+        */
+       @Deprecated
+       public List<PoolArenaMetric> heapArenas() {
+               return heapArenaMetrics;
+       }
+
+       /**
+        * Return a {@link List} of all direct {@link PoolArenaMetric}s that 
are provided by this pool.
+        *
+        * @deprecated use {@link PooledByteBufAllocatorMetric#directArenas()}.
+        */
+       @Deprecated
+       public List<PoolArenaMetric> directArenas() {
+               return directArenaMetrics;
+       }
+
+       /**
+        * Return the number of thread local caches used by this {@link 
PooledByteBufAllocator}.
+        *
+        * @deprecated use {@link 
PooledByteBufAllocatorMetric#numThreadLocalCaches()}.
+        */
+       @Deprecated
+       public int numThreadLocalCaches() {
+               PoolArena<?>[] arenas = heapArenas != null ? heapArenas : 
directArenas;
+               if (arenas == null) {
+                       return 0;
+               }
+
+               int total = 0;
+               for (PoolArena<?> arena : arenas) {
+                       total += arena.numThreadCaches.get();
+               }
+
+               return total;
+       }
+
+       /**
+        * Return the size of the tiny cache.
+        *
+        * @deprecated use {@link PooledByteBufAllocatorMetric#tinyCacheSize()}.
+        */
+       @Deprecated
+       public int tinyCacheSize() {
+               return tinyCacheSize;
+       }
+
+       /**
+        * Return the size of the small cache.
+        *
+        * @deprecated use {@link 
PooledByteBufAllocatorMetric#smallCacheSize()}.
+        */
+       @Deprecated
+       public int smallCacheSize() {
+               return smallCacheSize;
+       }
+
+       /**
+        * Return the size of the normal cache.
+        *
+        * @deprecated use {@link 
PooledByteBufAllocatorMetric#normalCacheSize()}.
+        */
+       @Deprecated
+       public int normalCacheSize() {
+               return normalCacheSize;
+       }
+
+       /**
+        * Return the chunk size for an arena.
+        *
+        * @deprecated use {@link PooledByteBufAllocatorMetric#chunkSize()}.
+        */
+       @Deprecated
+       public final int chunkSize() {
+               return chunkSize;
+       }
+
+       final long usedHeapMemory() {
+               return usedMemory(heapArenas);
+       }
+
+       final long usedDirectMemory() {
+               return usedMemory(directArenas);
+       }
+
+       private static long usedMemory(PoolArena<?>[] arenas) {
+               if (arenas == null) {
+                       return -1;
+               }
+               long used = 0;
+               for (PoolArena<?> arena : arenas) {
+                       used += arena.numActiveBytes();
+                       if (used < 0) {
+                               return Long.MAX_VALUE;
+                       }
+               }
+               return used;
+       }
+
+       final PoolThreadCache threadCache() {
+               PoolThreadCache cache =  threadCache.get();
+               assert cache != null;
+               return cache;
+       }
+
+       /**
+        * Returns the status of the allocator (which contains all metrics) as 
string. Be aware this may be expensive
+        * and so should not called too frequently.
+        */
+       public String dumpStats() {
+               int heapArenasLen = heapArenas == null ? 0 : heapArenas.length;
+               StringBuilder buf = new StringBuilder(512)
+                       .append(heapArenasLen)
+                       .append(" heap arena(s):")
+                       .append(StringUtil.NEWLINE);
+               if (heapArenasLen > 0) {
+                       for (PoolArena<byte[]> a: heapArenas) {
+                               buf.append(a);
+                       }
+               }
+
+               int directArenasLen = directArenas == null ? 0 : 
directArenas.length;
+
+               buf.append(directArenasLen)
+                       .append(" direct arena(s):")
+                       .append(StringUtil.NEWLINE);
+               if (directArenasLen > 0) {
+                       for (PoolArena<ByteBuffer> a: directArenas) {
+                               buf.append(a);
+                       }
+               }
+
+               return buf.toString();
+       }
+}
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 4a56880..ff3eb79 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -28,6 +28,9 @@ under the License.
                <!-- Sometimes we have to temporarily fix very long, different 
formatted Calcite files. -->
                <suppress files="org[\\/]apache[\\/]calcite.*" 
checks="[a-zA-Z0-9]*"/>
 
+               <!-- Temporarily fix TM Metaspace memory leak caused by Apache 
Beam sdk harness. -->
+               <suppress 
files="org[\\/]apache[\\/]beam[\\/]vendor[\\/]grpc[\\/]v1p21p0[\\/]io[\\/]netty[\\/]buffer.*.java"
 checks="[a-zA-Z0-9]*"/>
+
                <!-- Python streaming API follows python naming conventions -->
                <suppress
                        
files="org[\\/]apache[\\/]flink[\\/]streaming[\\/]python[\\/]api[\\/].*.java"

Reply via email to