Repository: flink Updated Branches: refs/heads/master 8976bf16e -> 3465580e7
[FLINK-3595] [runtime] Eagerly destroy buffer pools on cancelling This closes #1780. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3465580e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3465580e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3465580e Branch: refs/heads/master Commit: 3465580e7918460fb2e911333d0b907cd0980d3d Parents: 8976bf1 Author: Ufuk Celebi <[email protected]> Authored: Thu Mar 10 12:02:25 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Mon Apr 4 21:26:50 2016 +0200 ---------------------------------------------------------------------- .../partition/consumer/SingleInputGate.java | 26 +- .../apache/flink/runtime/taskmanager/Task.java | 47 ++- .../buffer/LocalBufferPoolDestroyTest.java | 144 +++++++++ .../partition/consumer/SingleInputGateTest.java | 96 ++++++ .../TaskCancelAsyncProducerConsumerITCase.java | 310 +++++++++++++++++++ 5 files changed, 608 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3465580e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index efee27c..bf8bc73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -375,15 +375,19 @@ public class SingleInputGate implements InputGate { @Override public void requestPartitions() throws IOException, InterruptedException { - // Sanity check - if (numberOfInputChannels != inputChannels.size()) { - throw new IllegalStateException("Bug in input gate setup logic: mismatch between" + - "number of total input channels and the currently set number of input " + - "channels."); - } - synchronized (requestLock) { if (!requestedPartitionsFlag) { + if (isReleased) { + throw new IllegalStateException("Already released."); + } + + // Sanity checks + if (numberOfInputChannels != inputChannels.size()) { + throw new IllegalStateException("Bug in input gate setup logic: mismatch between" + + "number of total input channels and the currently set number of input " + + "channels."); + } + for (InputChannel inputChannel : inputChannels.values()) { inputChannel.requestSubpartition(consumedSubpartitionIndex); } @@ -404,14 +408,14 @@ public class SingleInputGate implements InputGate { return null; } - if (isReleased) { - throw new IllegalStateException("Already released."); - } - requestPartitions(); InputChannel currentChannel = null; while (currentChannel == null) { + if (isReleased) { + throw new IllegalStateException("Released"); + } + currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS); } http://git-wip-us.apache.org/repos/asf/flink/blob/3465580e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index b401fc2..1ae0053 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -851,8 +851,14 @@ public class Task implements Runnable { // because the canceling may block on user code, we cancel from a separate thread // we do not reuse the async call handler, because that one may be blocked, in which // case the canceling could not continue - Runnable canceler = new TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask, - taskCancellationInterval); + Runnable canceler = new TaskCanceler( + LOG, + invokable, + executingThread, + taskNameWithSubtask, + taskCancellationInterval, + producedPartitions, + inputGates); Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler, "Canceler for " + taskNameWithSubtask); cancelThread.setDaemon(true); @@ -1097,14 +1103,25 @@ public class Task implements Runnable { private final Thread executer; private final String taskName; private final long taskCancellationIntervalMillis; + private final ResultPartition[] producedPartitions; + private final SingleInputGate[] inputGates; + + public TaskCanceler( + Logger logger, + AbstractInvokable invokable, + Thread executer, + String taskName, + long cancelationInterval, + ResultPartition[] producedPartitions, + SingleInputGate[] inputGates) { - public TaskCanceler(Logger logger, AbstractInvokable invokable, - Thread executer, String taskName, long cancelationInterval) { this.logger = logger; this.invokable = invokable; this.executer = executer; this.taskName = taskName; this.taskCancellationIntervalMillis = cancelationInterval; + this.producedPartitions = producedPartitions; + this.inputGates = inputGates; } @Override @@ -1119,6 +1136,28 @@ public class Task implements Runnable { logger.error("Error while canceling the task", t); } + // Early release of input and output buffer pools. We do this + // in order to unblock async Threads, which produce/consume the + // intermediate streams outside of the main Task Thread. + // + // Don't do this before cancelling the invokable. Otherwise we + // will get misleading errors in the logs. + for (ResultPartition partition : producedPartitions) { + try { + partition.destroyBufferPool(); + } catch (Throwable t) { + LOG.error("Failed to release result partition buffer pool.", t); + } + } + + for (SingleInputGate inputGate : inputGates) { + try { + inputGate.releaseAllResources(); + } catch (Throwable t) { + LOG.error("Failed to release input gate.", t); + } + } + // interrupt the running thread initially executer.interrupt(); try { http://git-wip-us.apache.org/repos/asf/flink/blob/3465580e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java new file mode 100644 index 0000000..18e2136 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemoryType; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class LocalBufferPoolDestroyTest { + + /** + * Tests that a blocking request fails properly if the buffer pool is + * destroyed. + * + * <p>Starts a Thread, which triggers an unsatisfiable blocking buffer + * request. After making sure that the Thread is actually waiting in the + * blocking call, the buffer pool is destroyed and we check whether the + * request Thread threw the expected Exception. + */ + @Test + public void testDestroyWhileBlockingRequest() throws Exception { + AtomicReference<Exception> asyncException = new AtomicReference<>(); + + NetworkBufferPool networkBufferPool = null; + LocalBufferPool localBufferPool = null; + + try { + networkBufferPool = new NetworkBufferPool(1, 4096, MemoryType.HEAP); + localBufferPool = new LocalBufferPool(networkBufferPool, 1); + + // Drain buffer pool + assertNotNull(localBufferPool.requestBuffer()); + assertNull(localBufferPool.requestBuffer()); + + // Start request Thread + Thread thread = new Thread(new BufferRequestTask(localBufferPool, asyncException)); + thread.start(); + + // Wait for request + boolean success = false; + + for (int i = 0; i < 50; i++) { + StackTraceElement[] stackTrace = thread.getStackTrace(); + success = isInBlockingBufferRequest(stackTrace); + + if (success) { + break; + } else { + // Retry + Thread.sleep(500); + } + } + + // Verify that Thread was in blocking request + assertTrue("Did not trigger blocking buffer request.", success); + + // Destroy the buffer pool + localBufferPool.lazyDestroy(); + + // Wait for Thread to finish + thread.join(); + + // Verify expected Exception + assertNotNull("Did not throw expected Exception", asyncException.get()); + assertTrue(asyncException.get() instanceof IllegalStateException); + } finally { + if (localBufferPool != null) { + localBufferPool.lazyDestroy(); + } + + if (networkBufferPool != null) { + networkBufferPool.destroyAllBufferPools(); + networkBufferPool.destroy(); + } + } + } + + /** + * Returns whether the stack trace represents a Thread in a blocking buffer + * request. + * + * @param stackTrace Stack trace of the Thread to check + * + * @return Flag indicating whether the Thread is in a blocking buffer + * request or not + */ + private boolean isInBlockingBufferRequest(StackTraceElement[] stackTrace) { + if (stackTrace.length >= 3) { + return stackTrace[0].getMethodName().equals("wait") && + stackTrace[1].getMethodName().equals("requestBuffer") && + stackTrace[2].getMethodName().equals("requestBufferBlocking"); + } else { + return false; + } + } + + /** + * Task triggering a blocking buffer request (the test assumes that no + * buffer is available). + */ + private static class BufferRequestTask implements Runnable { + + private final BufferPool bufferPool; + private final AtomicReference<Exception> asyncException; + + public BufferRequestTask(BufferPool bufferPool, AtomicReference<Exception> asyncException) { + this.bufferPool = bufferPool; + this.asyncException = asyncException; + } + + @Override + public void run() { + try { + String msg = "Test assumption violated: expected no available buffer"; + assertNull(msg, bufferPool.requestBuffer()); + + bufferPool.requestBufferBlocking(); + } catch (Exception t) { + asyncException.set(t); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3465580e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 397abed..c4bb785 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -42,8 +42,10 @@ import org.junit.Test; import scala.Tuple2; import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -190,6 +192,100 @@ public class SingleInputGateTest { any(ResultPartitionID.class), anyInt(), any(BufferProvider.class)); } + /** + * Tests that the release of the input gate is noticed while polling the + * channels for available data. + */ + @Test + public void testReleaseWhilePollingChannel() throws Exception { + final AtomicReference<Exception> asyncException = new AtomicReference<>(); + + // Setup the input gate with a single channel that does nothing + final SingleInputGate inputGate = new SingleInputGate( + "InputGate", + new JobID(), + new ExecutionAttemptID(), + new IntermediateDataSetID(), + 0, + 1, + mock(PartitionStateChecker.class)); + + InputChannel unknown = new UnknownInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new LocalConnectionManager(), + new Tuple2<>(0, 0)); + + inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); + + // Start the consumer in a separate Thread + Thread asyncConsumer = new Thread() { + @Override + public void run() { + try { + inputGate.getNextBufferOrEvent(); + } catch (Exception e) { + asyncException.set(e); + } + } + }; + asyncConsumer.start(); + + // Wait for blocking queue poll call and release input gate + boolean success = false; + for (int i = 0; i < 50; i++) { + if (asyncConsumer != null && asyncConsumer.isAlive()) { + StackTraceElement[] stackTrace = asyncConsumer.getStackTrace(); + success = isInBlockingQueuePoll(stackTrace); + } + + if (success) { + break; + } else { + // Retry + Thread.sleep(500); + } + } + + // Verify that async consumer is in blocking request + assertTrue("Did not trigger blocking buffer request.", success); + + // Release the input gate + inputGate.releaseAllResources(); + + // Wait for Thread to finish and verify expected Exceptions. If the + // input gate status is not properly checked during requests, this + // call will never return. + asyncConsumer.join(); + + assertNotNull(asyncException.get()); + assertEquals(IllegalStateException.class, asyncException.get().getClass()); + } + + /** + * Returns whether the stack trace represents a Thread in a blocking queue + * poll call. + * + * @param stackTrace Stack trace of the Thread to check + * + * @return Flag indicating whether the Thread is in a blocking queue poll + * call. + */ + private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) { + for (StackTraceElement elem : stackTrace) { + if (elem.getMethodName().equals("poll") && + elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) { + + return true; + } + } + + return false; + } + // --------------------------------------------------------------------------------------------- static void verifyBufferOrEvent( http://git-wip-us.apache.org/repos/asf/flink/blob/3465580e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java new file mode 100644 index 0000000..f0e72d7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TaskCancelAsyncProducerConsumerITCase extends TestLogger { + + // The Exceptions thrown by the producer/consumer Threads + private static volatile Exception ASYNC_PRODUCER_EXCEPTION; + private static volatile Exception ASYNC_CONSUMER_EXCEPTION; + + // The Threads producing/consuming the intermediate stream + private static volatile Thread ASYNC_PRODUCER_THREAD; + private static volatile Thread ASYNC_CONSUMER_THREAD; + + /** + * Tests that a task waiting on an async producer/consumer that is stuck + * in a blocking buffer request can be properly cancelled. + * + * <p>This is currently required for the Flink Kafka sources, which spawn + * a separate Thread consuming from Kafka and producing the intermediate + * streams in the spawned Thread instead of the main task Thread. + */ + @Test + public void testCancelAsyncProducerAndConsumer() throws Exception { + Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); + TestingCluster flink = null; + + try { + // Cluster + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096); + config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 8); + + flink = new TestingCluster(config, true); + flink.start(); + + // Job with async producer and consumer + JobVertex producer = new JobVertex("AsyncProducer"); + producer.setParallelism(1); + producer.setInvokableClass(AsyncProducer.class); + + JobVertex consumer = new JobVertex("AsyncConsumer"); + consumer.setParallelism(1); + consumer.setInvokableClass(AsyncConsumer.class); + consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE); + + SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID()); + producer.setSlotSharingGroup(slot); + consumer.setSlotSharingGroup(slot); + + JobGraph jobGraph = new JobGraph(new ExecutionConfig(), producer, consumer); + + // Submit job and wait until running + ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft()); + flink.submitJobDetached(jobGraph); + + Object msg = new WaitForAllVerticesToBeRunning(jobGraph.getJobID()); + Future<?> runningFuture = jobManager.ask(msg, deadline.timeLeft()); + Await.ready(runningFuture, deadline.timeLeft()); + + // Wait for blocking requests, cancel and wait for cancellation + msg = new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED); + Future<?> cancelledFuture = jobManager.ask(msg, deadline.timeLeft()); + + boolean producerBlocked = false; + for (int i = 0; i < 50; i++) { + Thread thread = ASYNC_PRODUCER_THREAD; + + if (thread != null && thread.isAlive()) { + StackTraceElement[] stackTrace = thread.getStackTrace(); + producerBlocked = isInBlockingBufferRequest(stackTrace); + } + + if (producerBlocked) { + break; + } else { + // Retry + Thread.sleep(500); + } + } + + // Verify that async producer is in blocking request + assertTrue("Producer thread is not blocked.", producerBlocked); + + boolean consumerBlocked = false; + for (int i = 0; i < 50; i++) { + Thread thread = ASYNC_CONSUMER_THREAD; + + if (thread != null && thread.isAlive()) { + StackTraceElement[] stackTrace = thread.getStackTrace(); + consumerBlocked = isInBlockingQueuePoll(stackTrace); + } + + if (consumerBlocked) { + break; + } else { + // Retry + Thread.sleep(500); + } + } + + // Verify that async consumer is in blocking request + assertTrue("Consumer thread is not blocked.", consumerBlocked); + + msg = new CancelJob(jobGraph.getJobID()); + Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft()); + Await.ready(cancelFuture, deadline.timeLeft()); + + Await.ready(cancelledFuture, deadline.timeLeft()); + + // Verify the expected Exceptions + assertNotNull(ASYNC_PRODUCER_EXCEPTION); + assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass()); + + assertNotNull(ASYNC_CONSUMER_EXCEPTION); + assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass()); + } finally { + if (flink != null) { + flink.shutdown(); + } + } + } + + /** + * Returns whether the stack trace represents a Thread in a blocking buffer + * request. + * + * @param stackTrace Stack trace of the Thread to check + * + * @return Flag indicating whether the Thread is in a blocking buffer + * request or not + */ + private boolean isInBlockingBufferRequest(StackTraceElement[] stackTrace) { + return stackTrace.length >= 3 && stackTrace[0].getMethodName().equals("wait") && + stackTrace[1].getMethodName().equals("requestBuffer") && + stackTrace[2].getMethodName().equals("requestBufferBlocking"); + } + + /** + * Returns whether the stack trace represents a Thread in a blocking queue + * poll call. + * + * @param stackTrace Stack trace of the Thread to check + * + * @return Flag indicating whether the Thread is in a blocking queue poll + * call. + */ + private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) { + for (StackTraceElement elem : stackTrace) { + if (elem.getMethodName().equals("poll") && + elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) { + + return true; + } + } + + return false; + } + + /** + * Invokable emitting records in a separate Thread (not the main Task + * thread). + */ + public static class AsyncProducer extends AbstractInvokable { + + @Override + public void invoke() throws Exception { + Thread producer = new ProducerThread(getEnvironment().getWriter(0)); + + // Publish the async producer for the main test Thread + ASYNC_PRODUCER_THREAD = producer; + + producer.start(); + + // Wait for the producer Thread to finish. This is executed in the + // main Task thread and will be interrupted on cancellation. + while (producer.isAlive()) { + try { + producer.join(); + } catch (InterruptedException ignored) { + } + } + } + + /** + * The Thread emitting the records. + */ + private static class ProducerThread extends Thread { + + private final RecordWriter<LongValue> recordWriter; + + public ProducerThread(ResultPartitionWriter partitionWriter) { + this.recordWriter = new RecordWriter<>(partitionWriter); + } + + @Override + public void run() { + LongValue current = new LongValue(0); + + try { + while (true) { + current.setValue(current.getValue() + 1); + recordWriter.emit(current); + recordWriter.flush(); + } + } catch (Exception e) { + ASYNC_PRODUCER_EXCEPTION = e; + } + } + } + } + + /** + * Invokable consuming buffers in a separate Thread (not the main Task + * thread). + */ + public static class AsyncConsumer extends AbstractInvokable { + + @Override + public void invoke() throws Exception { + Thread consumer = new ConsumerThread(getEnvironment().getInputGate(0)); + + // Publish the async consumer for the main test Thread + ASYNC_CONSUMER_THREAD = consumer; + + consumer.start(); + + // Wait for the consumer Thread to finish. This is executed in the + // main Task thread and will be interrupted on cancellation. + while (consumer.isAlive()) { + try { + consumer.join(); + } catch (InterruptedException ignored) { + } + } + } + + /** + * The Thread consuming buffers. + */ + private static class ConsumerThread extends Thread { + + private final InputGate inputGate; + + public ConsumerThread(InputGate inputGate) { + this.inputGate = inputGate; + } + + @Override + public void run() { + try { + while (true) { + inputGate.getNextBufferOrEvent(); + } + } catch (Exception e) { + ASYNC_CONSUMER_EXCEPTION = e; + } + } + } + } +}
