This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ce4329c27 [CELEBORN-1079] Fix use of GuardedBy in client-flink/common
ce4329c27 is described below
commit ce4329c2715274ea0cc7536944d1ea47f405b46e
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Tue Oct 24 09:35:35 2023 +0800
[CELEBORN-1079] Fix use of GuardedBy in client-flink/common
### What changes were proposed in this pull request?
* Fix use of `GuardedBy` on nonexistant lock.
* Annotate methods, which are expected to be called with lock held, with
`GuardedBy` so that error prone can analyze all invocations
### Why are the changes needed?
There is no functional change, but it helps errorprone analysis.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
Closes #2029 from mridulm/fix-flink-guarded-by-annotation.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java | 3 ---
.../org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java | 2 ++
2 files changed, 2 insertions(+), 3 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
index 1b40508c9..0c2a0a2bb 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.core.memory.MemorySegment;
@@ -63,7 +62,6 @@ public class PartitionSortedBuffer implements SortBuffer {
private final BufferPool bufferPool;
/** A segment list as a joint buffer which stores all records and index
entries. */
- @GuardedBy("lock")
private final ArrayList<MemorySegment> buffers = new ArrayList<>();
/** Addresses of the first record's index entry for each subpartition. */
@@ -92,7 +90,6 @@ public class PartitionSortedBuffer implements SortBuffer {
// For writing
//
---------------------------------------------------------------------------------------------
/** Whether this sort buffer is released. A released sort buffer can not be
used. */
- @GuardedBy("lock")
private boolean isReleased;
/** Array index in the segment list of the current available buffer for
writing. */
private int writeSegmentIndex;
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
index 2e07fd168..e1f1ee1f5 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/TransferBufferPool.java
@@ -155,6 +155,7 @@ public class TransferBufferPool implements BufferRecycler {
}
}
+ @GuardedBy("lock")
private int assignCredits(CreditListener creditListener) {
assert Thread.holdsLock(lock);
@@ -176,6 +177,7 @@ public class TransferBufferPool implements BufferRecycler {
return numCredits;
}
+ @GuardedBy("lock")
private List<CreditAssignment> dispatchReservedCredits() {
assert Thread.holdsLock(lock);