This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0b1433c2379d1847cc906e7481c0f09dabc13d92 Author: Weijie Guo <res...@163.com> AuthorDate: Fri Mar 3 00:03:35 2023 +0800 [hotfix] Add missing @GuardedBy annotation for LocalBufferPool. --- .../flink/runtime/io/network/buffer/LocalBufferPool.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index 7e6168ee362..bb14a7d69c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -295,7 +295,9 @@ class LocalBufferPool implements BufferPool { @VisibleForTesting public int getNumberOfRequestedOverdraftMemorySegments() { - return numberOfRequestedOverdraftMemorySegments; + synchronized (availableMemorySegments) { + return numberOfRequestedOverdraftMemorySegments; + } } @Override @@ -312,6 +314,8 @@ class LocalBufferPool implements BufferPool { } } + // suppress the FieldAccessNotGuarded warning as this method is unsafe by design. + @SuppressWarnings("FieldAccessNotGuarded") @Override public int bestEffortGetNumOfUsedBuffers() { return Math.max( @@ -413,6 +417,7 @@ class LocalBufferPool implements BufferPool { return segment; } + @GuardedBy("availableMemorySegments") private void checkDestroyed() { if (isDestroyed) { throw new CancelTaskException("Buffer pool has already been destroyed."); @@ -424,6 +429,7 @@ class LocalBufferPool implements BufferPool { return requestMemorySegment(UNKNOWN_CHANNEL); } + @GuardedBy("availableMemorySegments") private boolean requestMemorySegmentFromGlobal() { assert Thread.holdsLock(availableMemorySegments); @@ -444,6 +450,7 @@ class LocalBufferPool implements BufferPool { return false; } + @GuardedBy("availableMemorySegments") private MemorySegment requestOverdraftMemorySegmentFromGlobal() { assert Thread.holdsLock(availableMemorySegments); @@ -499,6 +506,7 @@ class LocalBufferPool implements BufferPool { mayNotifyAvailable(toNotify); } + @GuardedBy("availableMemorySegments") private boolean shouldBeAvailable() { assert Thread.holdsLock(availableMemorySegments); @@ -551,6 +559,7 @@ class LocalBufferPool implements BufferPool { shouldBeAvailable(), needRequestingNotificationOfGlobalPoolAvailable); } + @GuardedBy("availableMemorySegments") private void checkConsistentAvailability() { assert Thread.holdsLock(availableMemorySegments); @@ -719,6 +728,7 @@ class LocalBufferPool implements BufferPool { } } + @GuardedBy("availableMemorySegments") private void returnMemorySegment(MemorySegment segment) { assert Thread.holdsLock(availableMemorySegments); @@ -731,6 +741,7 @@ class LocalBufferPool implements BufferPool { networkBufferPool.recyclePooledMemorySegment(segment); } + @GuardedBy("availableMemorySegments") private void returnExcessMemorySegments() { assert Thread.holdsLock(availableMemorySegments); @@ -744,11 +755,13 @@ class LocalBufferPool implements BufferPool { } } + @GuardedBy("availableMemorySegments") private boolean hasExcessBuffers() { return numberOfRequestedOverdraftMemorySegments > 0 || numberOfRequestedMemorySegments > currentPoolSize; } + @GuardedBy("availableMemorySegments") private boolean isRequestedSizeReached() { return numberOfRequestedMemorySegments >= currentPoolSize; }