This is an automated email from the ASF dual-hosted git repository. zhouky pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 6dc30d39765f6312579856d421b5f89e26f95a7c Author: Mridul Muralidharan <[email protected]> AuthorDate: Tue Oct 24 09:35:35 2023 +0800 [CELEBORN-1079] Fix use of GuardedBy in client-flink/common * Fix use of `GuardedBy` on nonexistant lock. * Annotate methods, which are expected to be called with lock held, with `GuardedBy` so that error prone can analyze all invocations There is no functional change, but it helps errorprone analysis. No Unit tests Closes #2029 from mridulm/fix-flink-guarded-by-annotation. Authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: zky.zhoukeyong <[email protected]> --- .../org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java | 3 --- .../org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java | 2 ++ 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java index 1b40508c9..0c2a0a2bb 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java @@ -28,7 +28,6 @@ import java.util.ArrayList; import java.util.Arrays; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; import org.apache.flink.core.memory.MemorySegment; @@ -63,7 +62,6 @@ public class PartitionSortedBuffer implements SortBuffer { private final BufferPool bufferPool; /** A segment list as a joint buffer which stores all records and index entries. */ - @GuardedBy("lock") private final ArrayList<MemorySegment> buffers = new ArrayList<>(); /** Addresses of the first record's index entry for each subpartition. */ @@ -92,7 +90,6 @@ public class PartitionSortedBuffer implements SortBuffer { // For writing // --------------------------------------------------------------------------------------------- /** Whether this sort buffer is released. A released sort buffer can not be used. */ - @GuardedBy("lock") private boolean isReleased; /** Array index in the segment list of the current available buffer for writing. */ private int writeSegmentIndex; diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java index 2e07fd168..e1f1ee1f5 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java @@ -155,6 +155,7 @@ public class TransferBufferPool implements BufferRecycler { } } + @GuardedBy("lock") private int assignCredits(CreditListener creditListener) { assert Thread.holdsLock(lock); @@ -176,6 +177,7 @@ public class TransferBufferPool implements BufferRecycler { return numCredits; } + @GuardedBy("lock") private List<CreditAssignment> dispatchReservedCredits() { assert Thread.holdsLock(lock);
