Repository: flink Updated Branches: refs/heads/release-1.1 3ae6e9e09 -> 2b612f2d8
[FLINK-5114] [network] Handle partition producer state check for unregistered executions This closes #2975. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b612f2d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b612f2d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b612f2d Branch: refs/heads/release-1.1 Commit: 2b612f2d8fa2493fa1d0d586bc0fe10afa9150ca Parents: 3ae6e9e Author: Ufuk Celebi <[email protected]> Authored: Thu Dec 8 23:48:39 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Fri Dec 9 14:20:08 2016 +0100 ---------------------------------------------------------------------- .../executiongraph/IntermediateResult.java | 33 +++ .../runtime/io/network/NetworkEnvironment.java | 37 +-- .../netty/PartitionProducerStateChecker.java | 49 ++++ .../io/network/netty/PartitionStateChecker.java | 34 --- .../partition/consumer/SingleInputGate.java | 10 +- .../PartitionProducerDisposedException.java | 36 +++ .../flink/runtime/jobmanager/JobManager.scala | 74 ++++-- .../runtime/messages/JobManagerMessages.scala | 29 ++- .../runtime/messages/TaskControlMessages.scala | 25 +- .../flink/runtime/taskmanager/TaskManager.scala | 46 +++- .../partition/InputGateConcurrentTest.java | 10 +- .../partition/InputGateFairnessTest.java | 20 +- .../consumer/LocalInputChannelTest.java | 6 +- .../partition/consumer/SingleInputGateTest.java | 12 +- .../partition/consumer/TestSingleInputGate.java | 4 +- .../partition/consumer/UnionInputGateTest.java | 6 +- .../runtime/jobmanager/JobManagerTest.java | 252 ++++++++++++++++--- .../runtime/taskmanager/TaskManagerTest.java | 22 +- 18 files changed, 520 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index 67b1fe0..6da2cd3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -20,7 +20,9 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkArgument; @@ -34,6 +36,14 @@ public class IntermediateResult { private final IntermediateResultPartition[] partitions; + /** + * Maps intermediate result partition IDs to a partition index. This is + * used for ID lookups of intermediate results. I didn't dare to change the + * partition connect logic in other places that is tightly coupled to the + * partitions being held as an array. + */ + private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>(); + private final int numParallelProducers; private final AtomicInteger numberOfRunningProducers; @@ -80,6 +90,7 @@ public class IntermediateResult { } partitions[partitionNumber] = partition; + partitionLookupHelper.put(partition.getPartitionId(), partitionNumber); partitionsAssigned++; } @@ -95,6 +106,28 @@ public class IntermediateResult { return partitions; } + /** + * Returns the partition with the given ID. + * + * @param resultPartitionId ID of the partition to look up + * @throws NullPointerException If partition ID <code>null</code> + * @throws IllegalArgumentException Thrown if unknown partition ID + * @return Intermediate result partition with the given ID + */ + public IntermediateResultPartition getPartitionById(IntermediateResultPartitionID resultPartitionId) { + // Looks ups the partition number via the helper map and returns the + // partition. Currently, this happens infrequently enough that we could + // consider removing the map and scanning the partitions on every lookup. + // The lookup (currently) only happen when the producer of an intermediate + // result cannot be found via its registered execution. + Integer partitionNumber = partitionLookupHelper.get(checkNotNull(resultPartitionId, "IntermediateResultPartitionID")); + if (partitionNumber != null) { + return partitions[partitionNumber]; + } else { + throw new IllegalArgumentException("Unknown intermediate result partition ID " + resultPartitionId); + } + } + public int getNumberOfAssignedPartitions() { return partitionsAssigned; } http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index d3715ed..7bac93b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -28,14 +28,14 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState; import org.apache.flink.runtime.messages.TaskMessages.FailTask; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; @@ -82,7 +82,7 @@ public class NetworkEnvironment { private ResultPartitionConsumableNotifier partitionConsumableNotifier; - private PartitionStateChecker partitionStateChecker; + private PartitionProducerStateChecker partitionStateChecker; private boolean isShutdown; @@ -143,7 +143,7 @@ public class NetworkEnvironment { return partitionConsumableNotifier; } - public PartitionStateChecker getPartitionStateChecker() { + public PartitionProducerStateChecker getPartitionProducerStateChecker() { return partitionStateChecker; } @@ -196,7 +196,7 @@ public class NetworkEnvironment { taskManagerGateway, jobManagerTimeout); - this.partitionStateChecker = new JobManagerPartitionStateChecker( + this.partitionStateChecker = new ActorGatewayPartitionProducerStateChecker( jobManagerGateway, taskManagerGateway); // ----- Network connections ----- @@ -474,28 +474,31 @@ public class NetworkEnvironment { } } - private static class JobManagerPartitionStateChecker implements PartitionStateChecker { + private static class ActorGatewayPartitionProducerStateChecker implements PartitionProducerStateChecker { private final ActorGateway jobManager; private final ActorGateway taskManager; - public JobManagerPartitionStateChecker(ActorGateway jobManager, ActorGateway taskManager) { + ActorGatewayPartitionProducerStateChecker(ActorGateway jobManager, ActorGateway taskManager) { this.jobManager = jobManager; this.taskManager = taskManager; } @Override - public void triggerPartitionStateCheck( - JobID jobId, - ExecutionAttemptID executionAttemptID, - IntermediateDataSetID resultId, - ResultPartitionID partitionId) { - - RequestPartitionState msg = new RequestPartitionState( - jobId, partitionId, executionAttemptID, resultId); - - jobManager.tell(msg, taskManager); + public void requestPartitionProducerState( + JobID jobId, + ExecutionAttemptID receiverExecutionId, + IntermediateDataSetID intermediateDataSetId, + ResultPartitionID resultPartitionId) { + + jobManager.tell( + new RequestPartitionProducerState( + jobId, + receiverExecutionId, + intermediateDataSetId, + resultPartitionId), + taskManager); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java new file mode 100644 index 0000000..18d92b1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +/** + * Intermediate partition state checker to query the JobManager about the state + * of the producer of a result partition. + * + * <p>These checks are triggered when a partition request is answered with a + * PartitionNotFound event. This usually happens when the producer of that + * partition has not registered itself with the network stack or terminated. + */ +public interface PartitionProducerStateChecker { + + /** + * Requests the execution state of the execution producing a result partition. + * + * @param jobId ID of the job the partition belongs to. + * @param receiverExecutionId The execution attempt ID of the task who triggers the request. + * @param intermediateDataSetId ID of the parent intermediate data set. + * @param resultPartitionId ID of the result partition to check. This + */ + void requestPartitionProducerState( + JobID jobId, ExecutionAttemptID receiverExecutionId, + IntermediateDataSetID intermediateDataSetId, + ResultPartitionID resultPartitionId); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java deleted file mode 100644 index ecbcdaa..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.netty; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; - -public interface PartitionStateChecker { - - void triggerPartitionStateCheck( - JobID jobId, - ExecutionAttemptID executionId, - IntermediateDataSetID resultId, - ResultPartitionID partitionId); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 1550b0d..67bc8d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -139,7 +139,7 @@ public class SingleInputGate implements InputGate { private final BitSet channelsWithEndOfPartitionEvents; /** The partition state checker to use for failed partition requests. */ - private final PartitionStateChecker partitionStateChecker; + private final PartitionProducerStateChecker partitionStateChecker; /** * Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers @@ -172,7 +172,7 @@ public class SingleInputGate implements InputGate { IntermediateDataSetID consumedResultId, int consumedSubpartitionIndex, int numberOfInputChannels, - PartitionStateChecker partitionStateChecker, + PartitionProducerStateChecker partitionStateChecker, IOMetricGroup metrics) { this.owningTaskName = checkNotNull(owningTaskName); @@ -510,7 +510,7 @@ public class SingleInputGate implements InputGate { } void triggerPartitionStateCheck(ResultPartitionID partitionId) { - partitionStateChecker.triggerPartitionStateCheck( + partitionStateChecker.requestPartitionProducerState( jobId, executionId, consumedResultId, @@ -567,7 +567,7 @@ public class SingleInputGate implements InputGate { final SingleInputGate inputGate = new SingleInputGate( owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, - icdd.length, networkEnvironment.getPartitionStateChecker(), metrics); + icdd.length, networkEnvironment.getPartitionProducerStateChecker(), metrics); // Create the input channels. There is one input channel for each consumed partition. final InputChannel[] inputChannels = new InputChannel[icdd.length]; http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java new file mode 100644 index 0000000..12f2433 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState; + +/** + * Exception returned to a TaskManager on {@link RequestPartitionProducerState} + * if the producer of a partition has been disposed. + */ +public class PartitionProducerDisposedException extends Exception { + + public PartitionProducerDisposedException(ResultPartitionID resultPartitionID) { + super(String.format("Execution %s producing partition %s has already been disposed.", + resultPartitionID.getProducerId(), + resultPartitionID.getPartitionId())); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 2b455b7..d6d23d9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -62,7 +62,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} import org.apache.flink.runtime.messages.RegistrationMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace} -import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState} +import org.apache.flink.runtime.messages.TaskMessages.{PartitionProducerState, UpdateTaskExecutionState} import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint} import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _} @@ -839,28 +839,68 @@ class JobManager( ) } - case RequestPartitionState(jobId, partitionId, taskExecutionId, taskResultId) => - val state = currentJobs.get(jobId) match { + case RequestPartitionProducerState( + jobId, + receiverId, + intermediateDataSetId, + resultPartitionId) => + + currentJobs.get(jobId) match { case Some((executionGraph, _)) => - val execution = executionGraph.getRegisteredExecutions.get(partitionId.getProducerId) + try { + // Find the execution attempt producing the intermediate result partition. + val execution = executionGraph + .getRegisteredExecutions + .get(resultPartitionId.getProducerId) + + if (execution != null) { + // Common case for pipelined exchanges => producing execution is + // still active. + val success = (intermediateDataSetId, resultPartitionId, execution.getState) + sender ! decorateMessage(PartitionProducerState(receiverId, Left(success))) + } else { + // The producing execution might have terminated and been + // unregistered. We now look for the producing execution via the + // intermediate result itself. + val intermediateResult = executionGraph + .getAllIntermediateResults.get(intermediateDataSetId) + + if (intermediateResult != null) { + // Try to find the producing execution + val producerExecution = intermediateResult + .getPartitionById(resultPartitionId.getPartitionId) + .getProducer + .getCurrentExecutionAttempt + + if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) { + val success = ( + intermediateDataSetId, + resultPartitionId, + producerExecution.getState) + sender ! decorateMessage(PartitionProducerState(receiverId, Left(success))) + } else { + val failure = new PartitionProducerDisposedException(resultPartitionId) + sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure))) + } + } else { + val failure = new IllegalArgumentException("Intermediate data set with ID" + + s"$intermediateDataSetId not found.") + sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure))) + } + } + } catch { + case e: Exception => + val failure = new RuntimeException("Failed to look up execution state of " + + s"producer with ID ${resultPartitionId.getProducerId}.", e) + sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure))) + } - if (execution != null) execution.getState else null case None => - // Nothing to do. This is not an error, because the request is received when a sending - // task fails or is not yet available during a remote partition request. - log.debug(s"Cannot find execution graph for job $jobId.") + val failure = new IllegalArgumentException(s"Job with ID $jobId not found.") + sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure))) - null } - sender ! decorateMessage( - PartitionState( - taskExecutionId, - taskResultId, - partitionId.getPartitionId, - state) - ) - case RequestJobStatus(jobID) => currentJobs.get(jobID) match { case Some((executionGraph,_)) => http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 14f72b0..97006d2 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -127,21 +127,24 @@ object JobManagerMessages { */ case class NextInputSplit(splitData: Array[Byte]) + /** - * Requests the current state of the partition. - * - * The state of a partition is currently bound to the state of the producing execution. - * - * @param jobId The job ID of the job, which produces the partition. - * @param partitionId The partition ID of the partition to request the state of. - * @param taskExecutionId The execution attempt ID of the task requesting the partition state. - * @param taskResultId The input gate ID of the task requesting the partition state. - */ - case class RequestPartitionState( + * Requests the execution state of the execution producing a result partition. + * + * @param jobId ID of the job the partition belongs to. + * @param receiverExecutionId Execution ID of the task who triggers the request. Required to + * return an answer to the TaskManager, which needs the ID to route + * this to the receiver task. + * @param intermediateDataSetId ID of the parent intermediate data set. + * @param resultPartitionId ID of the result partition to check. This + * identifies the producing execution and + * partition. + */ + case class RequestPartitionProducerState( jobId: JobID, - partitionId: ResultPartitionID, - taskExecutionId: ExecutionAttemptID, - taskResultId: IntermediateDataSetID) + receiverExecutionId: ExecutionAttemptID, + intermediateDataSetId: IntermediateDataSetID, + resultPartitionId: ResultPartitionID) extends RequiresLeaderSessionID /** http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala index 94762ee..e73d651 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala @@ -21,8 +21,9 @@ package org.apache.flink.runtime.messages import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor} import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.executiongraph.ExecutionAttemptID -import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, IntermediateResultPartitionID} -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState +import org.apache.flink.runtime.io.network.partition.ResultPartitionID +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID +import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState import org.apache.flink.runtime.taskmanager.TaskExecutionState /** @@ -92,13 +93,19 @@ object TaskMessages { // -------------------------------------------------------------------------- /** - * Answer to a [[RequestPartitionState]] with the state of the respective partition. - */ - case class PartitionState( - taskExecutionId: ExecutionAttemptID, - taskResultId: IntermediateDataSetID, - partitionId: IntermediateResultPartitionID, - state: ExecutionState) + * Answer to a [[RequestPartitionProducerState]] with the state of the partition producer. + * + * @param receiverExecutionId The execution attempt ID of the task who triggered the + * original request and should receive this update. + * @param result Either a successful or failed partition producer state check + * result. On success, this is a 3-tuple of intermediate data set ID + * (to identify the input gate), the partition ID (to identify the + * channel) and the producer state. On failure, this contains the + * failure cause. + */ + case class PartitionProducerState( + receiverExecutionId: ExecutionAttemptID, + result: Either[(IntermediateDataSetID, ResultPartitionID, ExecutionState), Exception]) extends TaskMessage with RequiresLeaderSessionID /** http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 40ae234..a751865 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -24,7 +24,7 @@ import java.lang.reflect.Method import java.net.{InetAddress, InetSocketAddress} import java.util import java.util.UUID -import java.util.concurrent.TimeUnit +import java.util.concurrent.{TimeUnit, TimeoutException} import javax.management.ObjectName import _root_.akka.actor._ @@ -42,11 +42,11 @@ import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType} import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot -import org.apache.flink.runtime.clusterframework.messages.StopCluster -import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, QuarantineMonitor} import org.apache.flink.runtime.blob.{BlobCache, BlobClient, BlobService} import org.apache.flink.runtime.broadcast.BroadcastVariableManager +import org.apache.flink.runtime.clusterframework.messages.StopCluster +import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor} import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager} @@ -58,16 +58,17 @@ import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync} import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.io.network.netty.NettyConfig import org.apache.flink.runtime.jobgraph.IntermediateDataSetID +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.messages.Messages._ import org.apache.flink.runtime.messages.RegistrationMessages._ -import org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStackTraceSampleFailure, ResponseStackTraceSampleSuccess, SampleTaskStackTrace, StackTraceSampleMessages, TriggerStackTraceSample} +import org.apache.flink.runtime.messages.StackTraceSampleMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages._ import org.apache.flink.runtime.messages.TaskMessages._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint} -import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner @@ -80,7 +81,7 @@ import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.forkjoin.ForkJoinPool import scala.language.postfixOps -import scala.util.{Failure, Success, Try} +import scala.util.{Failure, Success} /** * The TaskManager is responsible for executing the individual tasks of a Flink job. It is @@ -503,15 +504,38 @@ class TaskManager( ) } - case PartitionState(taskExecutionId, taskResultId, partitionId, state) => - Option(runningTasks.get(taskExecutionId)) match { + // Updates the partition producer state + case PartitionProducerState(receiverExecutionId, result) => + Option(runningTasks.get(receiverExecutionId)) match { case Some(task) => - task.onPartitionStateUpdate(taskResultId, partitionId, state) + try { + result match { + case Left((intermediateDataSetId, resultPartitionId, producerState)) => + // Forward the state update to the task + task.onPartitionStateUpdate( + intermediateDataSetId, + resultPartitionId.getPartitionId, + producerState) + + case Right(failure) => + // Cancel or fail the execution + if (failure.isInstanceOf[PartitionProducerDisposedException]) { + log.info("Partition producer disposed. Cancelling " + + s"execution $receiverExecutionId.", failure) + task.cancelExecution() + } else { + task.failExternally(failure) + } + } + } catch { + case e: Exception => task.failExternally(e) + } case None => - log.debug(s"Cannot find task $taskExecutionId to respond with partition state.") + log.debug(s"Cannot find task with ID $receiverExecutionId to forward partition " + + "state respond to.") } } - } + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java index a5f4c7d..955f335 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; @@ -65,7 +65,7 @@ public class InputGateConcurrentTest { new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); for (int i = 0; i < numChannels; i++) { @@ -102,7 +102,7 @@ public class InputGateConcurrentTest { new IntermediateDataSetID(), 0, numChannels, - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); for (int i = 0; i < numChannels; i++) { @@ -152,7 +152,7 @@ public class InputGateConcurrentTest { new IntermediateDataSetID(), 0, numChannels, - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); for (int i = 0, local = 0; i < numChannels; i++) { @@ -192,7 +192,7 @@ public class InputGateConcurrentTest { // ------------------------------------------------------------------------ private static abstract class Source { - + abstract void addBuffer(Buffer buffer) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java index 192b0eb..c367018 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; @@ -92,7 +92,7 @@ public class InputGateFairnessTest { new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); for (int i = 0; i < numChannels; i++) { @@ -146,7 +146,7 @@ public class InputGateFairnessTest { new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); for (int i = 0; i < numChannels; i++) { @@ -197,7 +197,7 @@ public class InputGateFairnessTest { new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final ConnectionManager connManager = createDummyConnectionManager(); @@ -206,11 +206,11 @@ public class InputGateFairnessTest { for (int i = 0; i < numChannels; i++) { RemoteInputChannel channel = new RemoteInputChannel( - gate, i, new ResultPartitionID(), mock(ConnectionID.class), + gate, i, new ResultPartitionID(), mock(ConnectionID.class), connManager, new Tuple2<>(0, 0), new DummyIOMetricGroup()); channels[i] = channel; - + for (int p = 0; p < buffersPerChannel; p++) { channel.onBuffer(mockBuffer, p); } @@ -253,7 +253,7 @@ public class InputGateFairnessTest { new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final ConnectionManager connManager = createDummyConnectionManager(); @@ -335,7 +335,7 @@ public class InputGateFairnessTest { partitions[i].onBuffer(buffer, sequenceNumbers[i]++); } } - + // ------------------------------------------------------------------------ private static class FairnessVerifyingInputGate extends SingleInputGate { @@ -352,11 +352,11 @@ public class InputGateFairnessTest { IntermediateDataSetID consumedResultId, int consumedSubpartitionIndex, int numberOfInputChannels, - PartitionStateChecker partitionStateChecker, + PartitionProducerStateChecker partitionProducerStateChecker, IOMetricGroup metrics) { super(owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, - numberOfInputChannels, partitionStateChecker, metrics); + numberOfInputChannels, partitionProducerStateChecker, metrics); try { Field f = SingleInputGate.class.getDeclaredField("inputChannelsWithData"); http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 5d0a106..411f344 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartition; @@ -284,7 +284,7 @@ public class LocalInputChannelTest { new IntermediateDataSetID(), 0, 1, - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup() ); @@ -481,7 +481,7 @@ public class LocalInputChannelTest { new IntermediateDataSetID(), subpartitionIndex, numberOfInputChannels, - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); // Set buffer pool http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index ec4b31d..f2fb2d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; @@ -74,7 +74,7 @@ public class SingleInputGateTest { public void testBasicGetNextLogic() throws Exception { // Setup final SingleInputGate inputGate = new SingleInputGate( - "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final TestInputChannel[] inputChannels = new TestInputChannel[]{ new TestInputChannel(inputGate, 0), @@ -128,7 +128,7 @@ public class SingleInputGateTest { // Setup reader with one local and one unknown input channel final IntermediateDataSetID resultId = new IntermediateDataSetID(); - final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final BufferPool bufferPool = mock(BufferPool.class); when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2); @@ -183,7 +183,7 @@ public class SingleInputGateTest { new IntermediateDataSetID(), 0, 1, - mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); @@ -223,7 +223,7 @@ public class SingleInputGateTest { new IntermediateDataSetID(), 0, 1, - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); InputChannel unknown = new UnknownInputChannel( @@ -314,7 +314,7 @@ public class SingleInputGateTest { NetworkEnvironment netEnv = mock(NetworkEnvironment.class); when(netEnv.getPartitionManager()).thenReturn(new ResultPartitionManager()); when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher()); - when(netEnv.getPartitionStateChecker()).thenReturn(mock(PartitionStateChecker.class)); + when(netEnv.getPartitionProducerStateChecker()).thenReturn(mock(PartitionProducerStateChecker.class)); when(netEnv.getPartitionRequestInitialAndMaxBackoff()).thenReturn(new Tuple2<>(initialBackoff, maxBackoff)); when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager()); http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index 0749467..b1d9483 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; @@ -59,7 +59,7 @@ public class TestSingleInputGate { new IntermediateDataSetID(), 0, numberOfInputChannels, - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); this.inputGate = spy(realGate); http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index faec77e..8cfda0f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.junit.Test; @@ -43,8 +43,8 @@ public class UnionInputGateTest { public void testBasicGetNextLogic() throws Exception { // Setup final String testTaskName = "Test Task"; - final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); - final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2}); http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index b56bf29..e41982e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -48,12 +48,12 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState; import org.apache.flink.runtime.messages.JobManagerMessages.StopJob; import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure; import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess; import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; -import org.apache.flink.runtime.messages.TaskMessages.PartitionState; +import org.apache.flink.runtime.messages.TaskMessages.PartitionProducerState; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManager; @@ -89,7 +89,6 @@ import static org.apache.flink.runtime.io.network.partition.ResultPartitionType. import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT; import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -178,7 +177,7 @@ public class JobManagerTest extends TestLogger { // Request the execution graph to get the runtime info jobManagerGateway.tell(new RequestExecutionGraph(jid), testActorGateway); - final ExecutionGraph eg = expectMsgClass(ExecutionGraphFound.class) + final ExecutionGraph eg = (ExecutionGraph) expectMsgClass(ExecutionGraphFound.class) .executionGraph(); final ExecutionVertex vertex = eg.getJobVertex(sender.getID()) @@ -193,59 +192,236 @@ public class JobManagerTest extends TestLogger { // - The test ---------------------------------------------------------------------- + ExecutionAttemptID receiverId = new ExecutionAttemptID(); + // 1. All execution states - RequestPartitionState request = new RequestPartitionState( - jid, partitionId, receiver, rid); + RequestPartitionProducerState request = new RequestPartitionProducerState( + jid, receiverId, rid, partitionId); for (ExecutionState state : ExecutionState.values()) { ExecutionGraphTestUtils.setVertexState(vertex, state); - jobManagerGateway.tell(request, testActorGateway); + Future<?> futurePartitionState = jobManagerGateway + .ask(request, getRemainingTime()); - LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class); + LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(futurePartitionState, getRemainingTime()); + PartitionProducerState resp = (PartitionProducerState) (PartitionProducerState) wrappedMsg.message(); + assertEquals(receiverId, resp.receiverExecutionId()); + assertTrue("Responded with failure: " + resp, resp.result().isLeft()); + assertEquals(state, resp.result().left().get()._3()); + } - assertEquals(PartitionState.class, lsm.message().getClass()); + // 2. Non-existing execution + request = new RequestPartitionProducerState(jid, receiverId, rid, new ResultPartitionID()); - PartitionState resp = (PartitionState) lsm.message(); + Future<?> futurePartitionState = jobManagerGateway.ask(request, getRemainingTime()); + LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(futurePartitionState, getRemainingTime()); + PartitionProducerState resp = (PartitionProducerState) wrappedMsg.message(); + assertEquals(receiverId, resp.receiverExecutionId()); + assertTrue("Responded with success: " + resp, resp.result().isRight()); + assertTrue(resp.result().right().get() instanceof RuntimeException); + assertTrue(resp.result().right().get().getCause() instanceof IllegalArgumentException); - assertEquals(request.taskExecutionId(), resp.taskExecutionId()); - assertEquals(request.taskResultId(), resp.taskResultId()); - assertEquals(request.partitionId().getPartitionId(), resp.partitionId()); - assertEquals(state, resp.state()); + // 3. Non-existing job + request = new RequestPartitionProducerState(new JobID(), receiverId, rid, new ResultPartitionID()); + futurePartitionState = jobManagerGateway.ask(request, getRemainingTime()); + wrappedMsg = (LeaderSessionMessage) Await.result(futurePartitionState, getRemainingTime()); + resp = (PartitionProducerState) wrappedMsg.message(); + assertEquals(receiverId, resp.receiverExecutionId()); + assertTrue("Responded with success: " + resp, resp.result().isRight()); + assertTrue(resp.result().right().get() instanceof IllegalArgumentException); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (cluster != null) { + cluster.shutdown(); } + } + } + }; + }}; + } - // 2. Non-existing execution - request = new RequestPartitionState(jid, new ResultPartitionID(), receiver, rid); + /** + * Tests the JobManager response when the execution is not registered with + * the ExecutionGraph. + */ + @Test + public void testRequestPartitionStateUnregisteredExecution() throws Exception { + new JavaTestKit(system) {{ + new Within(duration("15 seconds")) { + @Override + protected void run() { + // Setup + TestingCluster cluster = null; + + try { + cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT()); + + final IntermediateDataSetID rid = new IntermediateDataSetID(); + + // Create a task + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(1); + sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish + sender.createAndAddResultDataSet(rid, PIPELINED); - jobManagerGateway.tell(request, testActorGateway); + final JobVertex sender2 = new JobVertex("Blocking Sender"); + sender2.setParallelism(1); + sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block + sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED); - LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class); + final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2); + final JobID jid = jobGraph.getJobID(); - assertEquals(PartitionState.class, lsm.message().getClass()); + final ActorGateway jobManagerGateway = cluster.getLeaderGateway( + TestingUtils.TESTING_DURATION()); - PartitionState resp = (PartitionState) lsm.message(); + // we can set the leader session ID to None because we don't use this gateway to send messages + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); - assertEquals(request.taskExecutionId(), resp.taskExecutionId()); - assertEquals(request.taskResultId(), resp.taskResultId()); - assertEquals(request.partitionId().getPartitionId(), resp.partitionId()); - assertNull(resp.state()); + // Submit the job and wait for all vertices to be running + jobManagerGateway.tell( + new SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT), + testActorGateway); + expectMsgClass(JobManagerMessages.JobSubmitSuccess.class); - // 3. Non-existing job - request = new RequestPartitionState( - new JobID(), new ResultPartitionID(), receiver, rid); + jobManagerGateway.tell( + new WaitForAllVerticesToBeRunningOrFinished(jid), + testActorGateway); + + expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class); + + Future<Object> egFuture = jobManagerGateway.ask( + new RequestExecutionGraph(jobGraph.getJobID()), remaining()); + + ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining()); + ExecutionGraph eg = egFound.executionGraph(); + + ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0]; + while (vertex.getExecutionState() != ExecutionState.FINISHED) { + Thread.sleep(1); + } + + IntermediateResultPartition partition = vertex.getProducedPartitions() + .values().iterator().next(); + + ResultPartitionID partitionId = new ResultPartitionID( + partition.getPartitionId(), + vertex.getCurrentExecutionAttempt().getAttemptId()); + + // Producer finished, request state + ExecutionAttemptID receiverId = new ExecutionAttemptID(); + + Future<?> producerStateFuture = jobManagerGateway.ask( + new RequestPartitionProducerState(jid, receiverId, rid, partitionId), getRemainingTime()); + + LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(producerStateFuture, getRemainingTime()); + PartitionProducerState resp = (PartitionProducerState) wrappedMsg.message(); + assertEquals(receiverId, resp.receiverExecutionId()); + assertTrue("Responded with failure: " + resp, resp.result().isLeft()); + assertEquals(ExecutionState.FINISHED, resp.result().left().get()._3()); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + }; + }}; + } + + /** + * Tests the JobManager response when the execution is not registered with + * the ExecutionGraph anymore and a new execution attempt is available. + */ + @Test + public void testRequestPartitionStateMoreRecentExecutionAttempt() throws Exception { + new JavaTestKit(system) {{ + new Within(duration("15 seconds")) { + @Override + protected void run() { + // Setup + TestingCluster cluster = null; + + try { + cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT()); + + final IntermediateDataSetID rid = new IntermediateDataSetID(); + + // Create a task + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(1); + sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish + sender.createAndAddResultDataSet(rid, PIPELINED); + + final JobVertex sender2 = new JobVertex("Blocking Sender"); + sender2.setParallelism(1); + sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block + sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED); - jobManagerGateway.tell(request, testActorGateway); + final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2); + final JobID jid = jobGraph.getJobID(); + + final ActorGateway jobManagerGateway = cluster.getLeaderGateway( + TestingUtils.TESTING_DURATION()); + + // we can set the leader session ID to None because we don't use this gateway to send messages + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + + // Submit the job and wait for all vertices to be running + jobManagerGateway.tell( + new SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT), + testActorGateway); + expectMsgClass(JobManagerMessages.JobSubmitSuccess.class); + + jobManagerGateway.tell( + new WaitForAllVerticesToBeRunningOrFinished(jid), + testActorGateway); + + expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class); + + Future<Object> egFuture = jobManagerGateway.ask( + new RequestExecutionGraph(jobGraph.getJobID()), remaining()); + + ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining()); + ExecutionGraph eg = (ExecutionGraph) egFound.executionGraph(); + + ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0]; + while (vertex.getExecutionState() != ExecutionState.FINISHED) { + Thread.sleep(1); + } + + IntermediateResultPartition partition = vertex.getProducedPartitions() + .values().iterator().next(); + + ResultPartitionID partitionId = new ResultPartitionID( + partition.getPartitionId(), + vertex.getCurrentExecutionAttempt().getAttemptId()); - lsm = expectMsgClass(LeaderSessionMessage.class); + // Reset execution => new execution attempt + vertex.resetForNewExecution(); - assertEquals(PartitionState.class, lsm.message().getClass()); + // Producer finished, request state + ExecutionAttemptID receiverId = new ExecutionAttemptID(); - resp = (PartitionState) lsm.message(); + Object request = new RequestPartitionProducerState(jid, receiverId, rid, partitionId); - assertEquals(request.taskExecutionId(), resp.taskExecutionId()); - assertEquals(request.taskResultId(), resp.taskResultId()); - assertEquals(request.partitionId().getPartitionId(), resp.partitionId()); - assertNull(resp.state()); + Future<?> producerStateFuture = jobManagerGateway.ask(request, getRemainingTime()); + + LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(producerStateFuture, getRemainingTime()); + PartitionProducerState resp = (PartitionProducerState) wrappedMsg.message(); + assertEquals(receiverId, resp.receiverExecutionId()); + assertTrue("Responded with success: " + resp, resp.result().isRight()); + assertTrue(resp.result().right().get() instanceof PartitionProducerDisposedException); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -366,11 +542,7 @@ public class JobManagerTest extends TestLogger { } /** - system.dispatcher(), - actorSystem.dispatcher(), - * Tests that we can trigger a - * - * @throws Exception + * Tests that we can trigger a savepoint when periodic checkpointing is disabled. */ @Test public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 6a696a0..87fb24c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.Messages; import org.apache.flink.runtime.messages.RegistrationMessages; import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleFailure; @@ -63,7 +64,6 @@ import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError; import org.apache.flink.runtime.messages.TaskMessages; import org.apache.flink.runtime.messages.TaskMessages.CancelTask; -import org.apache.flink.runtime.messages.TaskMessages.PartitionState; import org.apache.flink.runtime.messages.TaskMessages.StopTask; import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; @@ -81,10 +81,12 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; +import scala.Tuple3; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; +import scala.util.Left; import java.io.IOException; import java.net.InetSocketAddress; @@ -100,7 +102,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; -import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1586,14 +1587,15 @@ public class TaskManagerTest extends TestLogger { @Override public void handleMessage(Object message) throws Exception { - if (message instanceof RequestPartitionState) { - final RequestPartitionState msg = (RequestPartitionState) message; - - PartitionState resp = new PartitionState( - msg.taskExecutionId(), - msg.taskResultId(), - msg.partitionId().getPartitionId(), - ExecutionState.RUNNING); + if (message instanceof JobManagerMessages.RequestPartitionProducerState) { + JobManagerMessages.RequestPartitionProducerState msg = (JobManagerMessages.RequestPartitionProducerState) message; + + TaskMessages.PartitionProducerState resp = new TaskMessages.PartitionProducerState( + msg.receiverExecutionId(), + new Left<Tuple3<IntermediateDataSetID, ResultPartitionID, ExecutionState>, Exception>(new Tuple3<>( + msg.intermediateDataSetId(), + msg.resultPartitionId(), + ExecutionState.RUNNING))); getSender().tell(decorateMessage(resp), getSelf()); }
