This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bf7c1b7 [hotfix][runtime,tests] Add test coverage for
BlockingCallMonitoringThreadPool
bf7c1b7 is described below
commit bf7c1b7dc7747cd4031dab10dde7a7729c1aaa56
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Apr 26 11:43:43 2019 +0200
[hotfix][runtime,tests] Add test coverage for
BlockingCallMonitoringThreadPool
---
.../BlockingCallMonitoringThreadPool.java | 31 ++++--
.../BlockingCallMonitoringThreadPoolTest.java | 112 +++++++++++++++++++++
2 files changed, 136 insertions(+), 7 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
index 3c541c7..d0fb868 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
@@ -18,10 +18,13 @@
package org.apache.flink.runtime.taskmanager;
+import org.apache.flink.annotation.VisibleForTesting;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -44,6 +47,10 @@ public class BlockingCallMonitoringThreadPool {
private final ThreadPoolExecutor executor;
+ public BlockingCallMonitoringThreadPool() {
+ this(Executors.defaultThreadFactory());
+ }
+
public BlockingCallMonitoringThreadPool(final ThreadFactory
dispatcherThreadFactory) {
this.executor = new ThreadPoolExecutor(
1,
@@ -54,22 +61,22 @@ public class BlockingCallMonitoringThreadPool {
checkNotNull(dispatcherThreadFactory));
}
- public void submit(final Runnable runnable, final boolean blocking) {
+ public CompletableFuture<?> submit(final Runnable runnable, final
boolean blocking) {
if (blocking) {
- submitBlocking(runnable);
+ return submitBlocking(runnable);
} else {
- submit(runnable);
+ return submit(runnable);
}
}
- private void submit(final Runnable task) {
+ private CompletableFuture<?> submit(final Runnable task) {
adjustThreadPoolSize(inFlightBlockingCallCounter.get());
- executor.execute(task);
+ return CompletableFuture.runAsync(task, executor);
}
- private void submitBlocking(final Runnable task) {
+ private CompletableFuture<?> submitBlocking(final Runnable task) {
adjustThreadPoolSize(inFlightBlockingCallCounter.incrementAndGet());
- CompletableFuture.runAsync(task, executor).whenComplete(
+ return CompletableFuture.runAsync(task, executor).whenComplete(
(ignored, e) ->
inFlightBlockingCallCounter.decrementAndGet());
}
@@ -107,4 +114,14 @@ public class BlockingCallMonitoringThreadPool {
public void shutdownNow() {
executor.shutdownNow();
}
+
+ @VisibleForTesting
+ int getMaximumPoolSize() {
+ return executor.getMaximumPoolSize();
+ }
+
+ @VisibleForTesting
+ int getQueueSize() {
+ return executor.getQueue().size();
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
new file mode 100644
index 0000000..2cc3454
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link BlockingCallMonitoringThreadPool}.
+ */
+public class BlockingCallMonitoringThreadPoolTest {
+
+ private final static int TIME_OUT = 30;
+
+ private final OneShotLatch latch1 = new OneShotLatch();
+ private final OneShotLatch latch2 = new OneShotLatch();
+ private BlockingCallMonitoringThreadPool blockingCallThreadPool = new
BlockingCallMonitoringThreadPool();
+
+ @Before
+ public void setup() {
+ blockingCallThreadPool = new BlockingCallMonitoringThreadPool();
+ latch1.reset();
+ latch2.reset();
+ }
+
+ @After
+ public void tearDown() {
+ latch1.trigger();
+ latch2.trigger();
+ blockingCallThreadPool.shutdown();
+ }
+
+ @Test
+ public void testSubmitNonBlockingCalls() throws Exception {
+ blockingCallThreadPool.submit(() -> await(latch1), false);
+ blockingCallThreadPool.submit(() -> await(latch2), false);
+
+ assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
+ assertEquals(1, blockingCallThreadPool.getQueueSize());
+ }
+
+ @Test
+ public void testSubmitBlockingCall() throws Exception {
+ CompletableFuture<?> latch1Future =
blockingCallThreadPool.submit(() -> await(latch1), true);
+ CompletableFuture<?> latch2Future =
blockingCallThreadPool.submit(() -> await(latch2), false);
+
+ assertEquals(2, blockingCallThreadPool.getMaximumPoolSize());
+ assertEquals(0, blockingCallThreadPool.getQueueSize());
+
+ latch2.trigger();
+ latch2Future.get(TIME_OUT, TimeUnit.SECONDS);
+
+ assertFalse(latch1Future.isDone());
+ assertTrue(latch2Future.isDone());
+ }
+
+ @Test
+ public void testDownsizePool() throws Exception {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+
+ futures.add(blockingCallThreadPool.submit(() -> await(latch1),
true));
+ futures.add(blockingCallThreadPool.submit(() -> await(latch1),
true));
+ futures.add(blockingCallThreadPool.submit(() -> await(latch1),
false));
+
+ assertEquals(3, blockingCallThreadPool.getMaximumPoolSize());
+
+ latch1.trigger();
+
+ for (CompletableFuture<?> future : futures) {
+ future.get(TIME_OUT, TimeUnit.SECONDS);
+ }
+
+ blockingCallThreadPool.submit(() -> await(latch1),
false).get(TIME_OUT, TimeUnit.SECONDS);
+ assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
+ }
+
+ private void await(OneShotLatch latch) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}