This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bf7c1b7  [hotfix][runtime,tests] Add test coverage for 
BlockingCallMonitoringThreadPool
bf7c1b7 is described below

commit bf7c1b7dc7747cd4031dab10dde7a7729c1aaa56
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Apr 26 11:43:43 2019 +0200

    [hotfix][runtime,tests] Add test coverage for 
BlockingCallMonitoringThreadPool
---
 .../BlockingCallMonitoringThreadPool.java          |  31 ++++--
 .../BlockingCallMonitoringThreadPoolTest.java      | 112 +++++++++++++++++++++
 2 files changed, 136 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
index 3c541c7..d0fb868 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import org.apache.flink.annotation.VisibleForTesting;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -44,6 +47,10 @@ public class BlockingCallMonitoringThreadPool {
 
        private final ThreadPoolExecutor executor;
 
+       public BlockingCallMonitoringThreadPool() {
+               this(Executors.defaultThreadFactory());
+       }
+
        public BlockingCallMonitoringThreadPool(final ThreadFactory 
dispatcherThreadFactory) {
                this.executor = new ThreadPoolExecutor(
                                1,
@@ -54,22 +61,22 @@ public class BlockingCallMonitoringThreadPool {
                                checkNotNull(dispatcherThreadFactory));
        }
 
-       public void submit(final Runnable runnable, final boolean blocking) {
+       public CompletableFuture<?> submit(final Runnable runnable, final 
boolean blocking) {
                if (blocking) {
-                       submitBlocking(runnable);
+                       return submitBlocking(runnable);
                } else {
-                       submit(runnable);
+                       return submit(runnable);
                }
        }
 
-       private void submit(final Runnable task) {
+       private CompletableFuture<?> submit(final Runnable task) {
                adjustThreadPoolSize(inFlightBlockingCallCounter.get());
-               executor.execute(task);
+               return CompletableFuture.runAsync(task, executor);
        }
 
-       private void submitBlocking(final Runnable task) {
+       private CompletableFuture<?> submitBlocking(final Runnable task) {
                
adjustThreadPoolSize(inFlightBlockingCallCounter.incrementAndGet());
-               CompletableFuture.runAsync(task, executor).whenComplete(
+               return CompletableFuture.runAsync(task, executor).whenComplete(
                                (ignored, e) -> 
inFlightBlockingCallCounter.decrementAndGet());
        }
 
@@ -107,4 +114,14 @@ public class BlockingCallMonitoringThreadPool {
        public void shutdownNow() {
                executor.shutdownNow();
        }
+
+       @VisibleForTesting
+       int getMaximumPoolSize() {
+               return executor.getMaximumPoolSize();
+       }
+
+       @VisibleForTesting
+       int getQueueSize() {
+               return executor.getQueue().size();
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
new file mode 100644
index 0000000..2cc3454
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link BlockingCallMonitoringThreadPool}.
+ */
+public class BlockingCallMonitoringThreadPoolTest {
+
+       private final static int TIME_OUT = 30;
+
+       private final OneShotLatch latch1 = new OneShotLatch();
+       private final OneShotLatch latch2 = new OneShotLatch();
+       private BlockingCallMonitoringThreadPool blockingCallThreadPool = new 
BlockingCallMonitoringThreadPool();
+
+       @Before
+       public void setup() {
+               blockingCallThreadPool = new BlockingCallMonitoringThreadPool();
+               latch1.reset();
+               latch2.reset();
+       }
+
+       @After
+       public void tearDown() {
+               latch1.trigger();
+               latch2.trigger();
+               blockingCallThreadPool.shutdown();
+       }
+
+       @Test
+       public void testSubmitNonBlockingCalls() throws Exception {
+               blockingCallThreadPool.submit(() -> await(latch1), false);
+               blockingCallThreadPool.submit(() -> await(latch2), false);
+
+               assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
+               assertEquals(1, blockingCallThreadPool.getQueueSize());
+       }
+
+       @Test
+       public void testSubmitBlockingCall() throws Exception {
+               CompletableFuture<?> latch1Future = 
blockingCallThreadPool.submit(() -> await(latch1), true);
+               CompletableFuture<?> latch2Future = 
blockingCallThreadPool.submit(() -> await(latch2), false);
+
+               assertEquals(2, blockingCallThreadPool.getMaximumPoolSize());
+               assertEquals(0, blockingCallThreadPool.getQueueSize());
+
+               latch2.trigger();
+               latch2Future.get(TIME_OUT, TimeUnit.SECONDS);
+
+               assertFalse(latch1Future.isDone());
+               assertTrue(latch2Future.isDone());
+       }
+
+       @Test
+       public void testDownsizePool() throws Exception {
+               List<CompletableFuture<?>> futures = new ArrayList<>();
+
+               futures.add(blockingCallThreadPool.submit(() -> await(latch1), 
true));
+               futures.add(blockingCallThreadPool.submit(() -> await(latch1), 
true));
+               futures.add(blockingCallThreadPool.submit(() -> await(latch1), 
false));
+
+               assertEquals(3, blockingCallThreadPool.getMaximumPoolSize());
+
+               latch1.trigger();
+
+               for (CompletableFuture<?> future : futures) {
+                       future.get(TIME_OUT, TimeUnit.SECONDS);
+               }
+
+               blockingCallThreadPool.submit(() -> await(latch1), 
false).get(TIME_OUT, TimeUnit.SECONDS);
+               assertEquals(1, blockingCallThreadPool.getMaximumPoolSize());
+       }
+
+       private void await(OneShotLatch latch) {
+               try {
+                       latch.await();
+               } catch (InterruptedException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+}

Reply via email to