This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 22098b2 [FLINK-17182][network][tests] Fix the unstable
RemoteInputChannelTest.testConcurrentOnSenderBacklogAndRecycle
22098b2 is described below
commit 22098b27c32342f3ef74848a86d4b4d8d3c1dfb8
Author: Yun Gao <[email protected]>
AuthorDate: Fri Jun 12 10:52:08 2020 +0800
[FLINK-17182][network][tests] Fix the unstable
RemoteInputChannelTest.testConcurrentOnSenderBacklogAndRecycle
In this unstable unit test, the exclusive buffers and floating buffers are
recycled by different
threads, which might cause unexpected race condition issue. But actually
they should always be
recycled by the same task thread in practice. So we refactor the test
process to recycle them in
the same thread to avoid potential unnecessary issues.
This closes #11924.
---
.../partition/consumer/RemoteInputChannelTest.java | 63 +++++++++++-----------
1 file changed, 32 insertions(+), 31 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index b280422..3984dde 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -59,6 +59,8 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.Queue;
+import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -862,8 +864,7 @@ public class RemoteInputChannelTest {
// Submit tasks and wait to finish
submitTasksAndWaitForResults(executor, new Callable[]{
- recycleExclusiveBufferTask(inputChannel,
numExclusiveSegments),
- recycleFloatingBufferTask(bufferPool,
numFloatingBuffers),
+ recycleBufferTask(inputChannel, bufferPool,
numExclusiveSegments, numFloatingBuffers),
requestBufferTask});
assertEquals("There should be " +
inputChannel.getNumberOfRequiredBuffers() + " buffers available in channel.",
@@ -912,8 +913,7 @@ public class RemoteInputChannelTest {
// Submit tasks and wait to finish
submitTasksAndWaitForResults(executor, new Callable[]{
- recycleExclusiveBufferTask(inputChannel,
numExclusiveSegments),
- recycleFloatingBufferTask(bufferPool,
numFloatingBuffers),
+ recycleBufferTask(inputChannel, bufferPool,
numExclusiveSegments, numFloatingBuffers),
releaseTask});
assertEquals("There should be no buffers available in
the channel.",
@@ -1222,14 +1222,21 @@ public class RemoteInputChannelTest {
}
/**
- * Requests the exclusive buffers from input channel first and then
recycles them by a callable task.
+ * Requests the buffers from input channel and buffer pool first and
then recycles them by a callable task.
*
* @param inputChannel The input channel that exclusive buffers request
from.
+ * @param bufferPool The buffer pool that floating buffers request from.
* @param numExclusiveSegments The number of exclusive buffers to
request.
- * @return The callable task to recycle exclusive buffers.
+ * @param numFloatingBuffers The number of floating buffers to request.
+ * @return The callable task to recycle exclusive and floating buffers.
*/
- private Callable<Void> recycleExclusiveBufferTask(RemoteInputChannel
inputChannel, int numExclusiveSegments) {
- final List<Buffer> exclusiveBuffers = new
ArrayList<>(numExclusiveSegments);
+ private Callable<Void> recycleBufferTask(
+ RemoteInputChannel inputChannel,
+ BufferPool bufferPool,
+ int numExclusiveSegments,
+ int numFloatingBuffers) throws Exception {
+
+ Queue<Buffer> exclusiveBuffers = new
ArrayDeque<>(numExclusiveSegments);
// Exhaust all the exclusive buffers
for (int i = 0; i < numExclusiveSegments; i++) {
Buffer buffer = inputChannel.requestBuffer();
@@ -1237,27 +1244,7 @@ public class RemoteInputChannelTest {
exclusiveBuffers.add(buffer);
}
- return new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- for (Buffer buffer : exclusiveBuffers) {
- buffer.recycleBuffer();
- }
-
- return null;
- }
- };
- }
-
- /**
- * Requests the floating buffers from pool first and then recycles them
by a callable task.
- *
- * @param bufferPool The buffer pool that floating buffers request from.
- * @param numFloatingBuffers The number of floating buffers to request.
- * @return The callable task to recycle floating buffers.
- */
- private Callable<Void> recycleFloatingBufferTask(BufferPool bufferPool,
int numFloatingBuffers) throws Exception {
- final List<Buffer> floatingBuffers = new
ArrayList<>(numFloatingBuffers);
+ Queue<Buffer> floatingBuffers = new
ArrayDeque<>(numFloatingBuffers);
// Exhaust all the floating buffers
for (int i = 0; i < numFloatingBuffers; i++) {
Buffer buffer = bufferPool.requestBuffer();
@@ -1268,8 +1255,22 @@ public class RemoteInputChannelTest {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
- for (Buffer buffer : floatingBuffers) {
- buffer.recycleBuffer();
+ Random random = new Random();
+
+ while (!exclusiveBuffers.isEmpty() &&
!floatingBuffers.isEmpty()) {
+ if (random.nextBoolean()) {
+
exclusiveBuffers.poll().recycleBuffer();
+ } else {
+
floatingBuffers.poll().recycleBuffer();
+ }
+ }
+
+ while (!exclusiveBuffers.isEmpty()) {
+ exclusiveBuffers.poll().recycleBuffer();
+ }
+
+ while (!floatingBuffers.isEmpty()) {
+ floatingBuffers.poll().recycleBuffer();
}
return null;