This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 6dc30d39765f6312579856d421b5f89e26f95a7c
Author: Mridul Muralidharan <[email protected]>
AuthorDate: Tue Oct 24 09:35:35 2023 +0800

    [CELEBORN-1079] Fix use of GuardedBy in client-flink/common
    
    * Fix use of `GuardedBy` on nonexistant lock.
    * Annotate methods, which are expected to be called with lock held, with 
`GuardedBy` so that error prone can analyze all invocations
    
    There is no functional change, but it helps errorprone analysis.
    
    No
    
    Unit tests
    
    Closes #2029 from mridulm/fix-flink-guarded-by-annotation.
    
    Authored-by: Mridul Muralidharan <mridulatgmail.com>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java | 3 ---
 .../org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java    | 2 ++
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
index 1b40508c9..0c2a0a2bb 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import org.apache.flink.core.memory.MemorySegment;
@@ -63,7 +62,6 @@ public class PartitionSortedBuffer implements SortBuffer {
   private final BufferPool bufferPool;
 
   /** A segment list as a joint buffer which stores all records and index 
entries. */
-  @GuardedBy("lock")
   private final ArrayList<MemorySegment> buffers = new ArrayList<>();
 
   /** Addresses of the first record's index entry for each subpartition. */
@@ -92,7 +90,6 @@ public class PartitionSortedBuffer implements SortBuffer {
   // For writing
   // 
---------------------------------------------------------------------------------------------
   /** Whether this sort buffer is released. A released sort buffer can not be 
used. */
-  @GuardedBy("lock")
   private boolean isReleased;
   /** Array index in the segment list of the current available buffer for 
writing. */
   private int writeSegmentIndex;
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
index 2e07fd168..e1f1ee1f5 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
@@ -155,6 +155,7 @@ public class TransferBufferPool implements BufferRecycler {
     }
   }
 
+  @GuardedBy("lock")
   private int assignCredits(CreditListener creditListener) {
     assert Thread.holdsLock(lock);
 
@@ -176,6 +177,7 @@ public class TransferBufferPool implements BufferRecycler {
     return numCredits;
   }
 
+  @GuardedBy("lock")
   private List<CreditAssignment> dispatchReservedCredits() {
     assert Thread.holdsLock(lock);
 

Reply via email to