This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 22098b2  [FLINK-17182][network][tests] Fix the unstable 
RemoteInputChannelTest.testConcurrentOnSenderBacklogAndRecycle
22098b2 is described below

commit 22098b27c32342f3ef74848a86d4b4d8d3c1dfb8
Author: Yun Gao <[email protected]>
AuthorDate: Fri Jun 12 10:52:08 2020 +0800

    [FLINK-17182][network][tests] Fix the unstable 
RemoteInputChannelTest.testConcurrentOnSenderBacklogAndRecycle
    
    In this unstable unit test, the exclusive buffers and floating buffers are 
recycled by different
    threads, which might cause unexpected race condition issue. But actually 
they should always be
    recycled by the same task thread in practice. So we refactor the test 
process to recycle them in
    the same thread to avoid potential unnecessary issues.
    
    This closes #11924.
---
 .../partition/consumer/RemoteInputChannelTest.java | 63 +++++++++++-----------
 1 file changed, 32 insertions(+), 31 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index b280422..3984dde 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -59,6 +59,8 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Queue;
+import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -862,8 +864,7 @@ public class RemoteInputChannelTest {
 
                        // Submit tasks and wait to finish
                        submitTasksAndWaitForResults(executor, new Callable[]{
-                               recycleExclusiveBufferTask(inputChannel, 
numExclusiveSegments),
-                               recycleFloatingBufferTask(bufferPool, 
numFloatingBuffers),
+                               recycleBufferTask(inputChannel, bufferPool, 
numExclusiveSegments, numFloatingBuffers),
                                requestBufferTask});
 
                        assertEquals("There should be " + 
inputChannel.getNumberOfRequiredBuffers() + " buffers available in channel.",
@@ -912,8 +913,7 @@ public class RemoteInputChannelTest {
 
                        // Submit tasks and wait to finish
                        submitTasksAndWaitForResults(executor, new Callable[]{
-                               recycleExclusiveBufferTask(inputChannel, 
numExclusiveSegments),
-                               recycleFloatingBufferTask(bufferPool, 
numFloatingBuffers),
+                               recycleBufferTask(inputChannel, bufferPool, 
numExclusiveSegments, numFloatingBuffers),
                                releaseTask});
 
                        assertEquals("There should be no buffers available in 
the channel.",
@@ -1222,14 +1222,21 @@ public class RemoteInputChannelTest {
        }
 
        /**
-        * Requests the exclusive buffers from input channel first and then 
recycles them by a callable task.
+        * Requests the buffers from input channel and buffer pool first and 
then recycles them by a callable task.
         *
         * @param inputChannel The input channel that exclusive buffers request 
from.
+        * @param bufferPool The buffer pool that floating buffers request from.
         * @param numExclusiveSegments The number of exclusive buffers to 
request.
-        * @return The callable task to recycle exclusive buffers.
+        * @param numFloatingBuffers The number of floating buffers to request.
+        * @return The callable task to recycle exclusive and floating buffers.
         */
-       private Callable<Void> recycleExclusiveBufferTask(RemoteInputChannel 
inputChannel, int numExclusiveSegments) {
-               final List<Buffer> exclusiveBuffers = new 
ArrayList<>(numExclusiveSegments);
+       private Callable<Void> recycleBufferTask(
+               RemoteInputChannel inputChannel,
+               BufferPool bufferPool,
+               int numExclusiveSegments,
+               int numFloatingBuffers) throws Exception {
+
+               Queue<Buffer> exclusiveBuffers = new 
ArrayDeque<>(numExclusiveSegments);
                // Exhaust all the exclusive buffers
                for (int i = 0; i < numExclusiveSegments; i++) {
                        Buffer buffer = inputChannel.requestBuffer();
@@ -1237,27 +1244,7 @@ public class RemoteInputChannelTest {
                        exclusiveBuffers.add(buffer);
                }
 
-               return new Callable<Void>() {
-                       @Override
-                       public Void call() throws Exception {
-                               for (Buffer buffer : exclusiveBuffers) {
-                                       buffer.recycleBuffer();
-                               }
-
-                               return null;
-                       }
-               };
-       }
-
-       /**
-        * Requests the floating buffers from pool first and then recycles them 
by a callable task.
-        *
-        * @param bufferPool The buffer pool that floating buffers request from.
-        * @param numFloatingBuffers The number of floating buffers to request.
-        * @return The callable task to recycle floating buffers.
-        */
-       private Callable<Void> recycleFloatingBufferTask(BufferPool bufferPool, 
int numFloatingBuffers) throws Exception {
-               final List<Buffer> floatingBuffers = new 
ArrayList<>(numFloatingBuffers);
+               Queue<Buffer> floatingBuffers = new 
ArrayDeque<>(numFloatingBuffers);
                // Exhaust all the floating buffers
                for (int i = 0; i < numFloatingBuffers; i++) {
                        Buffer buffer = bufferPool.requestBuffer();
@@ -1268,8 +1255,22 @@ public class RemoteInputChannelTest {
                return new Callable<Void>() {
                        @Override
                        public Void call() throws Exception {
-                               for (Buffer buffer : floatingBuffers) {
-                                       buffer.recycleBuffer();
+                               Random random = new Random();
+
+                               while (!exclusiveBuffers.isEmpty() && 
!floatingBuffers.isEmpty()) {
+                                       if (random.nextBoolean()) {
+                                               
exclusiveBuffers.poll().recycleBuffer();
+                                       } else {
+                                               
floatingBuffers.poll().recycleBuffer();
+                                       }
+                               }
+
+                               while (!exclusiveBuffers.isEmpty()) {
+                                       exclusiveBuffers.poll().recycleBuffer();
+                               }
+
+                               while (!floatingBuffers.isEmpty()) {
+                                       floatingBuffers.poll().recycleBuffer();
                                }
 
                                return null;

Reply via email to