This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new f8d00a2  [FLINK-25654][network] Remove the redundant locks in 
SortMergeResultPartition and PartitionSortedBuffer.
f8d00a2 is described below

commit f8d00a2f8acf62df41f181ac867e3a57820972eb
Author: Yuxin Tan <[email protected]>
AuthorDate: Fri Jan 14 17:46:23 2022 +0800

    [FLINK-25654][network] Remove the redundant locks in 
SortMergeResultPartition and PartitionSortedBuffer.
    
    After FLINK-2372, the task canceler will never call the close method of 
ResultPartition, which can reduce some race conditions and simplify the code. 
This PR aims to remove some redundant locks in SortMergeResultPartition and 
PartitionSortedBuffer.
    
    This closes #18364.
---
 .../network/partition/PartitionSortedBuffer.java   | 53 ++++++++--------------
 .../partition/SortMergeResultPartition.java        | 44 +++++++-----------
 .../partition/PartitionSortedBufferTest.java       | 10 +---
 3 files changed, 38 insertions(+), 69 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBuffer.java
index b432d30..9f0ca0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBuffer.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
@@ -53,8 +52,6 @@ import static org.apache.flink.util.Preconditions.checkState;
 @NotThreadSafe
 public class PartitionSortedBuffer implements SortBuffer {
 
-    private final Object lock;
-
     /**
      * Size of an index entry: 4 bytes for record length, 4 bytes for data 
type and 8 bytes for
      * pointer to next entry.
@@ -65,7 +62,6 @@ public class PartitionSortedBuffer implements SortBuffer {
     private final BufferPool bufferPool;
 
     /** A segment list as a joint buffer which stores all records and index 
entries. */
-    @GuardedBy("lock")
     private final ArrayList<MemorySegment> segments = new ArrayList<>();
 
     /** Addresses of the first record's index entry for each subpartition. */
@@ -97,7 +93,6 @@ public class PartitionSortedBuffer implements SortBuffer {
     private boolean isFinished;
 
     /** Whether this sort buffer is released. A released sort buffer can not 
be used. */
-    @GuardedBy("lock")
     private boolean isReleased;
 
     // 
---------------------------------------------------------------------------------------------
@@ -127,7 +122,6 @@ public class PartitionSortedBuffer implements SortBuffer {
     private int readOrderIndex = -1;
 
     public PartitionSortedBuffer(
-            Object lock,
             BufferPool bufferPool,
             int numSubpartitions,
             int bufferSize,
@@ -136,7 +130,6 @@ public class PartitionSortedBuffer implements SortBuffer {
         checkArgument(bufferSize > INDEX_ENTRY_SIZE, "Buffer size is too 
small.");
         checkArgument(numGuaranteedBuffers > 0, "No guaranteed buffers for 
sort.");
 
-        this.lock = checkNotNull(lock);
         this.bufferPool = checkNotNull(bufferPool);
         this.bufferSize = bufferSize;
         this.numGuaranteedBuffers = numGuaranteedBuffers;
@@ -250,19 +243,17 @@ public class PartitionSortedBuffer implements SortBuffer {
     }
 
     private void addBuffer(MemorySegment segment) {
-        synchronized (lock) {
-            if (segment.size() != bufferSize) {
-                bufferPool.recycle(segment);
-                throw new IllegalStateException("Illegal memory segment 
size.");
-            }
-
-            if (isReleased) {
-                bufferPool.recycle(segment);
-                throw new IllegalStateException("Sort buffer is already 
released.");
-            }
+        if (segment.size() != bufferSize) {
+            bufferPool.recycle(segment);
+            throw new IllegalStateException("Illegal memory segment size.");
+        }
 
-            segments.add(segment);
+        if (isReleased) {
+            bufferPool.recycle(segment);
+            throw new IllegalStateException("Sort buffer is already 
released.");
         }
+
+        segments.add(segment);
     }
 
     private MemorySegment requestBufferFromPool() throws IOException {
@@ -436,27 +427,23 @@ public class PartitionSortedBuffer implements SortBuffer {
     @Override
     public void release() {
         // the sort buffer can be released by other threads
-        synchronized (lock) {
-            if (isReleased) {
-                return;
-            }
-
-            isReleased = true;
+        if (isReleased) {
+            return;
+        }
 
-            for (MemorySegment segment : segments) {
-                bufferPool.recycle(segment);
-            }
-            segments.clear();
+        isReleased = true;
 
-            numTotalBytes = 0;
-            numTotalRecords = 0;
+        for (MemorySegment segment : segments) {
+            bufferPool.recycle(segment);
         }
+        segments.clear();
+
+        numTotalBytes = 0;
+        numTotalRecords = 0;
     }
 
     @Override
     public boolean isReleased() {
-        synchronized (lock) {
-            return isReleased;
-        }
+        return isReleased;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index af27a0d..bea424a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -78,10 +78,8 @@ public class SortMergeResultPartition extends 
ResultPartition {
     private PartitionedFile resultFile;
 
     /** Buffers cut from the network buffer pool for data writing. */
-    @GuardedBy("lock")
     private final List<MemorySegment> writeSegments = new ArrayList<>();
 
-    @GuardedBy("lock")
     private boolean hasNotifiedEndOfUserRecords;
 
     /** Size of network buffer and write buffer. */
@@ -173,16 +171,14 @@ public class SortMergeResultPartition extends 
ResultPartition {
         }
         numBuffersForSort = numRequiredBuffer - numWriteBuffers;
 
-        synchronized (lock) {
-            try {
-                for (int i = 0; i < numWriteBuffers; ++i) {
-                    MemorySegment segment = 
bufferPool.requestMemorySegmentBlocking();
-                    writeSegments.add(segment);
-                }
-            } catch (InterruptedException exception) {
-                // the setup method does not allow InterruptedException
-                throw new IOException(exception);
+        try {
+            for (int i = 0; i < numWriteBuffers; ++i) {
+                MemorySegment segment = 
bufferPool.requestMemorySegmentBlocking();
+                writeSegments.add(segment);
             }
+        } catch (InterruptedException exception) {
+            // the setup method does not allow InterruptedException
+            throw new IOException(exception);
         }
 
         LOG.info(
@@ -276,7 +272,6 @@ public class SortMergeResultPartition extends 
ResultPartition {
 
         unicastSortBuffer =
                 new PartitionSortedBuffer(
-                        lock,
                         bufferPool,
                         numSubpartitions,
                         networkBufferSize,
@@ -294,7 +289,6 @@ public class SortMergeResultPartition extends 
ResultPartition {
 
         broadcastSortBuffer =
                 new PartitionSortedBuffer(
-                        lock,
                         bufferPool,
                         numSubpartitions,
                         networkBufferSize,
@@ -343,10 +337,8 @@ public class SortMergeResultPartition extends 
ResultPartition {
     }
 
     private Queue<MemorySegment> getWriteSegments() {
-        synchronized (lock) {
-            checkState(!writeSegments.isEmpty(), "Task has been canceled.");
-            return new ArrayDeque<>(writeSegments);
-        }
+        checkState(!writeSegments.isEmpty(), "Task has been canceled.");
+        return new ArrayDeque<>(writeSegments);
     }
 
     private BufferWithChannel compressBufferIfPossible(BufferWithChannel 
bufferWithChannel) {
@@ -398,11 +390,9 @@ public class SortMergeResultPartition extends 
ResultPartition {
 
     @Override
     public void notifyEndOfData() throws IOException {
-        synchronized (lock) {
-            if (!hasNotifiedEndOfUserRecords) {
-                broadcastEvent(EndOfData.INSTANCE, false);
-                hasNotifiedEndOfUserRecords = true;
-            }
+        if (!hasNotifiedEndOfUserRecords) {
+            broadcastEvent(EndOfData.INSTANCE, false);
+            hasNotifiedEndOfUserRecords = true;
         }
     }
 
@@ -425,13 +415,11 @@ public class SortMergeResultPartition extends 
ResultPartition {
     }
 
     private void releaseWriteBuffers() {
-        synchronized (lock) {
-            if (bufferPool != null) {
-                for (MemorySegment segment : writeSegments) {
-                    bufferPool.recycle(segment);
-                }
-                writeSegments.clear();
+        if (bufferPool != null) {
+            for (MemorySegment segment : writeSegments) {
+                bufferPool.recycle(segment);
             }
+            writeSegments.clear();
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBufferTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBufferTest.java
index 564f849..fa9c27c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBufferTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBufferTest.java
@@ -319,8 +319,7 @@ public class PartitionSortedBufferTest {
         BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, 
bufferPoolSize);
 
         SortBuffer sortBuffer =
-                new PartitionSortedBuffer(
-                        new Object(), bufferPool, 1, bufferSize, 
bufferPoolSize, null);
+                new PartitionSortedBuffer(bufferPool, 1, bufferSize, 
bufferPoolSize, null);
         sortBuffer.append(ByteBuffer.allocate(recordSize), 0, 
Buffer.DataType.DATA_BUFFER);
 
         assertEquals(bufferPoolSize, 
bufferPool.bestEffortGetNumOfUsedBuffers());
@@ -348,12 +347,7 @@ public class PartitionSortedBufferTest {
         BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, 
bufferPoolSize);
 
         return new PartitionSortedBuffer(
-                new Object(),
-                bufferPool,
-                numSubpartitions,
-                bufferSize,
-                bufferPoolSize,
-                customReadOrder);
+                bufferPool, numSubpartitions, bufferSize, bufferPoolSize, 
customReadOrder);
     }
 
     public static int[] getRandomSubpartitionOrder(int numSubpartitions) {

Reply via email to