Repository: flink
Updated Branches:
  refs/heads/master 8976bf16e -> 3465580e7


[FLINK-3595] [runtime] Eagerly destroy buffer pools on cancelling

This closes #1780.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3465580e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3465580e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3465580e

Branch: refs/heads/master
Commit: 3465580e7918460fb2e911333d0b907cd0980d3d
Parents: 8976bf1
Author: Ufuk Celebi <[email protected]>
Authored: Thu Mar 10 12:02:25 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Apr 4 21:26:50 2016 +0200

----------------------------------------------------------------------
 .../partition/consumer/SingleInputGate.java     |  26 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  47 ++-
 .../buffer/LocalBufferPoolDestroyTest.java      | 144 +++++++++
 .../partition/consumer/SingleInputGateTest.java |  96 ++++++
 .../TaskCancelAsyncProducerConsumerITCase.java  | 310 +++++++++++++++++++
 5 files changed, 608 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3465580e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index efee27c..bf8bc73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -375,15 +375,19 @@ public class SingleInputGate implements InputGate {
 
        @Override
        public void requestPartitions() throws IOException, 
InterruptedException {
-               // Sanity check
-               if (numberOfInputChannels != inputChannels.size()) {
-                       throw new IllegalStateException("Bug in input gate 
setup logic: mismatch between" +
-                                       "number of total input channels and the 
currently set number of input " +
-                                       "channels.");
-               }
-
                synchronized (requestLock) {
                        if (!requestedPartitionsFlag) {
+                               if (isReleased) {
+                                       throw new 
IllegalStateException("Already released.");
+                               }
+
+                               // Sanity checks
+                               if (numberOfInputChannels != 
inputChannels.size()) {
+                                       throw new IllegalStateException("Bug in 
input gate setup logic: mismatch between" +
+                                                       "number of total input 
channels and the currently set number of input " +
+                                                       "channels.");
+                               }
+
                                for (InputChannel inputChannel : 
inputChannels.values()) {
                                        
inputChannel.requestSubpartition(consumedSubpartitionIndex);
                                }
@@ -404,14 +408,14 @@ public class SingleInputGate implements InputGate {
                        return null;
                }
 
-               if (isReleased) {
-                       throw new IllegalStateException("Already released.");
-               }
-
                requestPartitions();
 
                InputChannel currentChannel = null;
                while (currentChannel == null) {
+                       if (isReleased) {
+                               throw new IllegalStateException("Released");
+                       }
+
                        currentChannel = inputChannelsWithData.poll(2, 
TimeUnit.SECONDS);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3465580e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index b401fc2..1ae0053 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -851,8 +851,14 @@ public class Task implements Runnable {
                                                // because the canceling may 
block on user code, we cancel from a separate thread
                                                // we do not reuse the async 
call handler, because that one may be blocked, in which
                                                // case the canceling could not 
continue
-                                               Runnable canceler = new 
TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask,
-                                                       
taskCancellationInterval);
+                                               Runnable canceler = new 
TaskCanceler(
+                                                               LOG,
+                                                               invokable,
+                                                               executingThread,
+                                                               
taskNameWithSubtask,
+                                                               
taskCancellationInterval,
+                                                               
producedPartitions,
+                                                               inputGates);
                                                Thread cancelThread = new 
Thread(executingThread.getThreadGroup(), canceler,
                                                                "Canceler for " 
+ taskNameWithSubtask);
                                                cancelThread.setDaemon(true);
@@ -1097,14 +1103,25 @@ public class Task implements Runnable {
                private final Thread executer;
                private final String taskName;
                private final long taskCancellationIntervalMillis;
+               private final ResultPartition[] producedPartitions;
+               private final SingleInputGate[] inputGates;
+
+               public TaskCanceler(
+                               Logger logger,
+                               AbstractInvokable invokable,
+                               Thread executer,
+                               String taskName,
+                               long cancelationInterval,
+                               ResultPartition[] producedPartitions,
+                               SingleInputGate[] inputGates) {
 
-               public TaskCanceler(Logger logger, AbstractInvokable invokable,
-                                                       Thread executer, String 
taskName, long cancelationInterval) {
                        this.logger = logger;
                        this.invokable = invokable;
                        this.executer = executer;
                        this.taskName = taskName;
                        this.taskCancellationIntervalMillis = 
cancelationInterval;
+                       this.producedPartitions = producedPartitions;
+                       this.inputGates = inputGates;
                }
 
                @Override
@@ -1119,6 +1136,28 @@ public class Task implements Runnable {
                                        logger.error("Error while canceling the 
task", t);
                                }
 
+                               // Early release of input and output buffer 
pools. We do this
+                               // in order to unblock async Threads, which 
produce/consume the
+                               // intermediate streams outside of the main 
Task Thread.
+                               //
+                               // Don't do this before cancelling the 
invokable. Otherwise we
+                               // will get misleading errors in the logs.
+                               for (ResultPartition partition : 
producedPartitions) {
+                                       try {
+                                               partition.destroyBufferPool();
+                                       } catch (Throwable t) {
+                                               LOG.error("Failed to release 
result partition buffer pool.", t);
+                                       }
+                               }
+
+                               for (SingleInputGate inputGate : inputGates) {
+                                       try {
+                                               inputGate.releaseAllResources();
+                                       } catch (Throwable t) {
+                                               LOG.error("Failed to release 
input gate.", t);
+                                       }
+                               }
+
                                // interrupt the running thread initially
                                executer.interrupt();
                                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/3465580e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
new file mode 100644
index 0000000..18e2136
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemoryType;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class LocalBufferPoolDestroyTest {
+
+       /**
+        * Tests that a blocking request fails properly if the buffer pool is
+        * destroyed.
+        *
+        * <p>Starts a Thread, which triggers an unsatisfiable blocking buffer
+        * request. After making sure that the Thread is actually waiting in the
+        * blocking call, the buffer pool is destroyed and we check whether the
+        * request Thread threw the expected Exception.
+        */
+       @Test
+       public void testDestroyWhileBlockingRequest() throws Exception {
+               AtomicReference<Exception> asyncException = new 
AtomicReference<>();
+
+               NetworkBufferPool networkBufferPool = null;
+               LocalBufferPool localBufferPool = null;
+
+               try {
+                       networkBufferPool = new NetworkBufferPool(1, 4096, 
MemoryType.HEAP);
+                       localBufferPool = new 
LocalBufferPool(networkBufferPool, 1);
+
+                       // Drain buffer pool
+                       assertNotNull(localBufferPool.requestBuffer());
+                       assertNull(localBufferPool.requestBuffer());
+
+                       // Start request Thread
+                       Thread thread = new Thread(new 
BufferRequestTask(localBufferPool, asyncException));
+                       thread.start();
+
+                       // Wait for request
+                       boolean success = false;
+
+                       for (int i = 0; i < 50; i++) {
+                               StackTraceElement[] stackTrace = 
thread.getStackTrace();
+                               success = isInBlockingBufferRequest(stackTrace);
+
+                               if (success) {
+                                       break;
+                               } else {
+                                       // Retry
+                                       Thread.sleep(500);
+                               }
+                       }
+
+                       // Verify that Thread was in blocking request
+                       assertTrue("Did not trigger blocking buffer request.", 
success);
+
+                       // Destroy the buffer pool
+                       localBufferPool.lazyDestroy();
+
+                       // Wait for Thread to finish
+                       thread.join();
+
+                       // Verify expected Exception
+                       assertNotNull("Did not throw expected Exception", 
asyncException.get());
+                       assertTrue(asyncException.get() instanceof 
IllegalStateException);
+               } finally {
+                       if (localBufferPool != null) {
+                               localBufferPool.lazyDestroy();
+                       }
+
+                       if (networkBufferPool != null) {
+                               networkBufferPool.destroyAllBufferPools();
+                               networkBufferPool.destroy();
+                       }
+               }
+       }
+
+       /**
+        * Returns whether the stack trace represents a Thread in a blocking 
buffer
+        * request.
+        *
+        * @param stackTrace Stack trace of the Thread to check
+        *
+        * @return Flag indicating whether the Thread is in a blocking buffer
+        * request or not
+        */
+       private boolean isInBlockingBufferRequest(StackTraceElement[] 
stackTrace) {
+               if (stackTrace.length >= 3) {
+                       return stackTrace[0].getMethodName().equals("wait") &&
+                                       
stackTrace[1].getMethodName().equals("requestBuffer") &&
+                                       
stackTrace[2].getMethodName().equals("requestBufferBlocking");
+               } else {
+                       return false;
+               }
+       }
+
+       /**
+        * Task triggering a blocking buffer request (the test assumes that no
+        * buffer is available).
+        */
+       private static class BufferRequestTask implements Runnable {
+
+               private final BufferPool bufferPool;
+               private final AtomicReference<Exception> asyncException;
+
+               public BufferRequestTask(BufferPool bufferPool, 
AtomicReference<Exception> asyncException) {
+                       this.bufferPool = bufferPool;
+                       this.asyncException = asyncException;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               String msg = "Test assumption violated: 
expected no available buffer";
+                               assertNull(msg, bufferPool.requestBuffer());
+
+                               bufferPool.requestBufferBlocking();
+                       } catch (Exception t) {
+                               asyncException.set(t);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3465580e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 397abed..c4bb785 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -42,8 +42,10 @@ import org.junit.Test;
 import scala.Tuple2;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -190,6 +192,100 @@ public class SingleInputGateTest {
                                any(ResultPartitionID.class), anyInt(), 
any(BufferProvider.class));
        }
 
+       /**
+        * Tests that the release of the input gate is noticed while polling the
+        * channels for available data.
+        */
+       @Test
+       public void testReleaseWhilePollingChannel() throws Exception {
+               final AtomicReference<Exception> asyncException = new 
AtomicReference<>();
+
+               // Setup the input gate with a single channel that does nothing
+               final SingleInputGate inputGate = new SingleInputGate(
+                               "InputGate",
+                               new JobID(),
+                               new ExecutionAttemptID(),
+                               new IntermediateDataSetID(),
+                               0,
+                               1,
+                               mock(PartitionStateChecker.class));
+
+               InputChannel unknown = new UnknownInputChannel(
+                               inputGate,
+                               0,
+                               new ResultPartitionID(),
+                               new ResultPartitionManager(),
+                               new TaskEventDispatcher(),
+                               new LocalConnectionManager(),
+                               new Tuple2<>(0, 0));
+
+               inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
+
+               // Start the consumer in a separate Thread
+               Thread asyncConsumer = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       inputGate.getNextBufferOrEvent();
+                               } catch (Exception e) {
+                                       asyncException.set(e);
+                               }
+                       }
+               };
+               asyncConsumer.start();
+
+               // Wait for blocking queue poll call and release input gate
+               boolean success = false;
+               for (int i = 0; i < 50; i++) {
+                       if (asyncConsumer != null && asyncConsumer.isAlive()) {
+                               StackTraceElement[] stackTrace = 
asyncConsumer.getStackTrace();
+                               success = isInBlockingQueuePoll(stackTrace);
+                       }
+
+                       if (success) {
+                               break;
+                       } else {
+                               // Retry
+                               Thread.sleep(500);
+                       }
+               }
+
+               // Verify that async consumer is in blocking request
+               assertTrue("Did not trigger blocking buffer request.", success);
+
+               // Release the input gate
+               inputGate.releaseAllResources();
+
+               // Wait for Thread to finish and verify expected Exceptions. If 
the
+               // input gate status is not properly checked during requests, 
this
+               // call will never return.
+               asyncConsumer.join();
+
+               assertNotNull(asyncException.get());
+               assertEquals(IllegalStateException.class, 
asyncException.get().getClass());
+       }
+
+       /**
+        * Returns whether the stack trace represents a Thread in a blocking 
queue
+        * poll call.
+        *
+        * @param stackTrace Stack trace of the Thread to check
+        *
+        * @return Flag indicating whether the Thread is in a blocking queue 
poll
+        * call.
+        */
+       private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) {
+               for (StackTraceElement elem : stackTrace) {
+                       if (elem.getMethodName().equals("poll") &&
+                                       
elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {
+
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+
        // 
---------------------------------------------------------------------------------------------
 
        static void verifyBufferOrEvent(

http://git-wip-us.apache.org/repos/asf/flink/blob/3465580e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
new file mode 100644
index 0000000..f0e72d7
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
+
+       // The Exceptions thrown by the producer/consumer Threads
+       private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
+       private static volatile Exception ASYNC_CONSUMER_EXCEPTION;
+
+       // The Threads producing/consuming the intermediate stream
+       private static volatile Thread ASYNC_PRODUCER_THREAD;
+       private static volatile Thread ASYNC_CONSUMER_THREAD;
+
+       /**
+        * Tests that a task waiting on an async producer/consumer that is stuck
+        * in a blocking buffer request can be properly cancelled.
+        *
+        * <p>This is currently required for the Flink Kafka sources, which 
spawn
+        * a separate Thread consuming from Kafka and producing the intermediate
+        * streams in the spawned Thread instead of the main task Thread.
+        */
+       @Test
+       public void testCancelAsyncProducerAndConsumer() throws Exception {
+               Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
+               TestingCluster flink = null;
+
+               try {
+                       // Cluster
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 8);
+
+                       flink = new TestingCluster(config, true);
+                       flink.start();
+
+                       // Job with async producer and consumer
+                       JobVertex producer = new JobVertex("AsyncProducer");
+                       producer.setParallelism(1);
+                       producer.setInvokableClass(AsyncProducer.class);
+
+                       JobVertex consumer = new JobVertex("AsyncConsumer");
+                       consumer.setParallelism(1);
+                       consumer.setInvokableClass(AsyncConsumer.class);
+                       consumer.connectNewDataSetAsInput(producer, 
DistributionPattern.POINTWISE);
+
+                       SlotSharingGroup slot = new 
SlotSharingGroup(producer.getID(), consumer.getID());
+                       producer.setSlotSharingGroup(slot);
+                       consumer.setSlotSharingGroup(slot);
+
+                       JobGraph jobGraph = new JobGraph(new ExecutionConfig(), 
producer, consumer);
+
+                       // Submit job and wait until running
+                       ActorGateway jobManager = 
flink.getLeaderGateway(deadline.timeLeft());
+                       flink.submitJobDetached(jobGraph);
+
+                       Object msg = new 
WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+                       Future<?> runningFuture = jobManager.ask(msg, 
deadline.timeLeft());
+                       Await.ready(runningFuture, deadline.timeLeft());
+
+                       // Wait for blocking requests, cancel and wait for 
cancellation
+                       msg = new NotifyWhenJobStatus(jobGraph.getJobID(), 
JobStatus.CANCELED);
+                       Future<?> cancelledFuture = jobManager.ask(msg, 
deadline.timeLeft());
+
+                       boolean producerBlocked = false;
+                       for (int i = 0; i < 50; i++) {
+                               Thread thread = ASYNC_PRODUCER_THREAD;
+
+                               if (thread != null && thread.isAlive()) {
+                                       StackTraceElement[] stackTrace = 
thread.getStackTrace();
+                                       producerBlocked = 
isInBlockingBufferRequest(stackTrace);
+                               }
+
+                               if (producerBlocked) {
+                                       break;
+                               } else {
+                                       // Retry
+                                       Thread.sleep(500);
+                               }
+                       }
+
+                       // Verify that async producer is in blocking request
+                       assertTrue("Producer thread is not blocked.", 
producerBlocked);
+
+                       boolean consumerBlocked = false;
+                       for (int i = 0; i < 50; i++) {
+                               Thread thread = ASYNC_CONSUMER_THREAD;
+
+                               if (thread != null && thread.isAlive()) {
+                                       StackTraceElement[] stackTrace = 
thread.getStackTrace();
+                                       consumerBlocked = 
isInBlockingQueuePoll(stackTrace);
+                               }
+
+                               if (consumerBlocked) {
+                                       break;
+                               } else {
+                                       // Retry
+                                       Thread.sleep(500);
+                               }
+                       }
+
+                       // Verify that async consumer is in blocking request
+                       assertTrue("Consumer thread is not blocked.", 
consumerBlocked);
+
+                       msg = new CancelJob(jobGraph.getJobID());
+                       Future<?> cancelFuture = jobManager.ask(msg, 
deadline.timeLeft());
+                       Await.ready(cancelFuture, deadline.timeLeft());
+
+                       Await.ready(cancelledFuture, deadline.timeLeft());
+
+                       // Verify the expected Exceptions
+                       assertNotNull(ASYNC_PRODUCER_EXCEPTION);
+                       assertEquals(IllegalStateException.class, 
ASYNC_PRODUCER_EXCEPTION.getClass());
+
+                       assertNotNull(ASYNC_CONSUMER_EXCEPTION);
+                       assertEquals(IllegalStateException.class, 
ASYNC_CONSUMER_EXCEPTION.getClass());
+               } finally {
+                       if (flink != null) {
+                               flink.shutdown();
+                       }
+               }
+       }
+
+       /**
+        * Returns whether the stack trace represents a Thread in a blocking 
buffer
+        * request.
+        *
+        * @param stackTrace Stack trace of the Thread to check
+        *
+        * @return Flag indicating whether the Thread is in a blocking buffer
+        * request or not
+        */
+       private boolean isInBlockingBufferRequest(StackTraceElement[] 
stackTrace) {
+               return stackTrace.length >= 3 && 
stackTrace[0].getMethodName().equals("wait") &&
+                               
stackTrace[1].getMethodName().equals("requestBuffer") &&
+                               
stackTrace[2].getMethodName().equals("requestBufferBlocking");
+       }
+
+       /**
+        * Returns whether the stack trace represents a Thread in a blocking 
queue
+        * poll call.
+        *
+        * @param stackTrace Stack trace of the Thread to check
+        *
+        * @return Flag indicating whether the Thread is in a blocking queue 
poll
+        * call.
+        */
+       private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) {
+               for (StackTraceElement elem : stackTrace) {
+                       if (elem.getMethodName().equals("poll") &&
+                                       
elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {
+
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+
+       /**
+        * Invokable emitting records in a separate Thread (not the main Task
+        * thread).
+        */
+       public static class AsyncProducer extends AbstractInvokable {
+
+               @Override
+               public void invoke() throws Exception {
+                       Thread producer = new 
ProducerThread(getEnvironment().getWriter(0));
+
+                       // Publish the async producer for the main test Thread
+                       ASYNC_PRODUCER_THREAD = producer;
+
+                       producer.start();
+
+                       // Wait for the producer Thread to finish. This is 
executed in the
+                       // main Task thread and will be interrupted on 
cancellation.
+                       while (producer.isAlive()) {
+                               try {
+                                       producer.join();
+                               } catch (InterruptedException ignored) {
+                               }
+                       }
+               }
+
+               /**
+                * The Thread emitting the records.
+                */
+               private static class ProducerThread extends Thread {
+
+                       private final RecordWriter<LongValue> recordWriter;
+
+                       public ProducerThread(ResultPartitionWriter 
partitionWriter) {
+                               this.recordWriter = new 
RecordWriter<>(partitionWriter);
+                       }
+
+                       @Override
+                       public void run() {
+                               LongValue current = new LongValue(0);
+
+                               try {
+                                       while (true) {
+                                               
current.setValue(current.getValue() + 1);
+                                               recordWriter.emit(current);
+                                               recordWriter.flush();
+                                       }
+                               } catch (Exception e) {
+                                       ASYNC_PRODUCER_EXCEPTION = e;
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Invokable consuming buffers in a separate Thread (not the main Task
+        * thread).
+        */
+       public static class AsyncConsumer extends AbstractInvokable {
+
+               @Override
+               public void invoke() throws Exception {
+                       Thread consumer = new 
ConsumerThread(getEnvironment().getInputGate(0));
+
+                       // Publish the async consumer for the main test Thread
+                       ASYNC_CONSUMER_THREAD = consumer;
+
+                       consumer.start();
+
+                       // Wait for the consumer Thread to finish. This is 
executed in the
+                       // main Task thread and will be interrupted on 
cancellation.
+                       while (consumer.isAlive()) {
+                               try {
+                                       consumer.join();
+                               } catch (InterruptedException ignored) {
+                               }
+                       }
+               }
+
+               /**
+                * The Thread consuming buffers.
+                */
+               private static class ConsumerThread extends Thread {
+
+                       private final InputGate inputGate;
+
+                       public ConsumerThread(InputGate inputGate) {
+                               this.inputGate = inputGate;
+                       }
+
+                       @Override
+                       public void run() {
+                               try {
+                                       while (true) {
+                                               
inputGate.getNextBufferOrEvent();
+                                       }
+                               } catch (Exception e) {
+                                       ASYNC_CONSUMER_EXCEPTION = e;
+                               }
+                       }
+               }
+       }
+}

Reply via email to