[FLINK-5114] [network] Handle partition producer state check for unregistered executions
This closes #2912. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a078666d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a078666d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a078666d Branch: refs/heads/master Commit: a078666d42ab4dae01dedaa50d55343ce141fcb8 Parents: 47db9cb Author: Ufuk Celebi <[email protected]> Authored: Tue Nov 22 16:15:04 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Mon Dec 12 17:39:16 2016 +0100 ---------------------------------------------------------------------- .../executiongraph/IntermediateResult.java | 37 ++- .../runtime/io/network/PartitionState.java | 64 ----- .../netty/PartitionProducerStateChecker.java | 52 +++++ .../io/network/netty/PartitionStateChecker.java | 35 --- .../partition/consumer/SingleInputGate.java | 13 +- .../PartitionProducerDisposedException.java | 36 +++ ...torGatewayPartitionProducerStateChecker.java | 66 ++++++ .../ActorGatewayPartitionStateChecker.java | 67 ------ .../apache/flink/runtime/taskmanager/Task.java | 121 ++++++---- .../flink/runtime/taskmanager/TaskActions.java | 20 +- .../flink/runtime/jobmanager/JobManager.scala | 70 ++++-- .../runtime/messages/JobManagerMessages.scala | 24 +- .../flink/runtime/taskmanager/TaskManager.scala | 6 +- .../partition/InputGateConcurrentTest.java | 4 - .../partition/InputGateFairnessTest.java | 8 +- .../consumer/LocalInputChannelTest.java | 3 - .../partition/consumer/SingleInputGateTest.java | 6 +- .../partition/consumer/TestSingleInputGate.java | 2 - .../partition/consumer/UnionInputGateTest.java | 5 +- .../runtime/jobmanager/JobManagerTest.java | 234 ++++++++++++++++--- .../runtime/taskmanager/TaskAsyncCallTest.java | 6 +- .../runtime/taskmanager/TaskManagerTest.java | 14 +- .../flink/runtime/taskmanager/TaskStopTest.java | 4 +- .../flink/runtime/taskmanager/TaskTest.java | 148 +++++++++++- .../runtime/tasks/BlockingCheckpointsTest.java | 7 +- .../tasks/InterruptSensitiveRestoreTest.java | 4 +- .../streaming/runtime/tasks/StreamTaskTest.java | 6 +- 27 files changed, 698 insertions(+), 364 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index c2c19d1..313272c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -20,7 +20,9 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkArgument; @@ -34,6 +36,14 @@ public class IntermediateResult { private final IntermediateResultPartition[] partitions; + /** + * Maps intermediate result partition IDs to a partition index. This is + * used for ID lookups of intermediate results. I didn't dare to change the + * partition connect logic in other places that is tightly coupled to the + * partitions being held as an array. + */ + private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>(); + private final int numParallelProducers; private final AtomicInteger numberOfRunningProducers; @@ -54,10 +64,12 @@ public class IntermediateResult { this.id = checkNotNull(id); this.producer = checkNotNull(producer); - this.partitions = new IntermediateResultPartition[numParallelProducers]; + checkArgument(numParallelProducers >= 1); this.numParallelProducers = numParallelProducers; + this.partitions = new IntermediateResultPartition[numParallelProducers]; + this.numberOfRunningProducers = new AtomicInteger(numParallelProducers); // we do not set the intermediate result partitions here, because we let them be initialized by @@ -80,6 +92,7 @@ public class IntermediateResult { } partitions[partitionNumber] = partition; + partitionLookupHelper.put(partition.getPartitionId(), partitionNumber); partitionsAssigned++; } @@ -95,6 +108,28 @@ public class IntermediateResult { return partitions; } + /** + * Returns the partition with the given ID. + * + * @param resultPartitionId ID of the partition to look up + * @throws NullPointerException If partition ID <code>null</code> + * @throws IllegalArgumentException Thrown if unknown partition ID + * @return Intermediate result partition with the given ID + */ + public IntermediateResultPartition getPartitionById(IntermediateResultPartitionID resultPartitionId) { + // Looks ups the partition number via the helper map and returns the + // partition. Currently, this happens infrequently enough that we could + // consider removing the map and scanning the partitions on every lookup. + // The lookup (currently) only happen when the producer of an intermediate + // result cannot be found via its registered execution. + Integer partitionNumber = partitionLookupHelper.get(checkNotNull(resultPartitionId, "IntermediateResultPartitionID")); + if (partitionNumber != null) { + return partitions[partitionNumber]; + } else { + throw new IllegalArgumentException("Unknown intermediate result partition ID " + resultPartitionId); + } + } + public int getNumberOfAssignedPartitions() { return partitionsAssigned; } http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java deleted file mode 100644 index 59357fc..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network; - -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.util.Preconditions; - -import javax.annotation.Nullable; -import java.io.Serializable; - -/** - * Contains information about the state of a result partition. - */ -public class PartitionState implements Serializable { - - private static final long serialVersionUID = -4693651272083825031L; - - private final IntermediateDataSetID intermediateDataSetID; - private final IntermediateResultPartitionID intermediateResultPartitionID; - private final ExecutionState executionState; - - public PartitionState( - IntermediateDataSetID intermediateDataSetID, - IntermediateResultPartitionID intermediateResultPartitionID, - @Nullable ExecutionState executionState) { - - this.intermediateDataSetID = Preconditions.checkNotNull(intermediateDataSetID); - this.intermediateResultPartitionID = Preconditions.checkNotNull(intermediateResultPartitionID); - this.executionState = executionState; - } - - public IntermediateDataSetID getIntermediateDataSetID() { - return intermediateDataSetID; - } - - public IntermediateResultPartitionID getIntermediateResultPartitionID() { - return intermediateResultPartitionID; - } - - /** - * Returns the execution state of the partition producer or <code>null</code> if it is not available. - */ - public ExecutionState getExecutionState() { - return executionState; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java new file mode 100644 index 0000000..d0b7e1e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +/** + * Intermediate partition state checker to query the JobManager about the state + * of the producer of a result partition. + * + * <p>These checks are triggered when a partition request is answered with a + * PartitionNotFound event. This usually happens when the producer of that + * partition has not registered itself with the network stack or terminated. + */ +public interface PartitionProducerStateChecker { + + /** + * Requests the execution state of the execution producing a result partition. + * + * @param jobId ID of the job the partition belongs to. + * @param intermediateDataSetId ID of the parent intermediate data set. + * @param resultPartitionId ID of the result partition to check. This + * identifies the producing execution and partition. + * + * @return Future holding the execution state of the producing execution. + */ + Future<ExecutionState> requestPartitionProducerState( + JobID jobId, + IntermediateDataSetID intermediateDataSetId, + ResultPartitionID resultPartitionId); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java deleted file mode 100644 index 949f426..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.netty; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.network.PartitionState; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; - -public interface PartitionStateChecker { - Future<PartitionState> requestPartitionState( - JobID jobId, - ExecutionAttemptID executionId, - IntermediateDataSetID resultId, - ResultPartitionID partitionId); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index b4d8d2c..d546559 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -108,9 +108,6 @@ public class SingleInputGate implements InputGate { /** The job ID of the owning task. */ private final JobID jobId; - /** The execution attempt ID of the owning task. */ - private final ExecutionAttemptID executionId; - /** * The ID of the consumed intermediate result. Each input gate consumes partitions of the * intermediate result specified by this ID. This ID also identifies the input gate at the @@ -168,7 +165,6 @@ public class SingleInputGate implements InputGate { public SingleInputGate( String owningTaskName, JobID jobId, - ExecutionAttemptID executionId, IntermediateDataSetID consumedResultId, int consumedSubpartitionIndex, int numberOfInputChannels, @@ -177,7 +173,6 @@ public class SingleInputGate implements InputGate { this.owningTaskName = checkNotNull(owningTaskName); this.jobId = checkNotNull(jobId); - this.executionId = checkNotNull(executionId); this.consumedResultId = checkNotNull(consumedResultId); @@ -530,11 +525,7 @@ public class SingleInputGate implements InputGate { } void triggerPartitionStateCheck(ResultPartitionID partitionId) { - taskActions.triggerPartitionStateCheck( - jobId, - executionId, - consumedResultId, - partitionId); + taskActions.triggerPartitionProducerStateCheck(jobId, consumedResultId, partitionId); } private void queueChannel(InputChannel channel) { @@ -587,7 +578,7 @@ public class SingleInputGate implements InputGate { final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors()); final SingleInputGate inputGate = new SingleInputGate( - owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, + owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex, icdd.length, taskActions, metrics); // Create the input channels. There is one input channel for each consumed partition. http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java new file mode 100644 index 0000000..12f2433 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState; + +/** + * Exception returned to a TaskManager on {@link RequestPartitionProducerState} + * if the producer of a partition has been disposed. + */ +public class PartitionProducerDisposedException extends Exception { + + public PartitionProducerDisposedException(ResultPartitionID resultPartitionID) { + super(String.format("Execution %s producing partition %s has already been disposed.", + resultPartitionID.getProducerId(), + resultPartitionID.getPartitionId())); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java new file mode 100644 index 0000000..5c229a9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.util.Preconditions; +import scala.concurrent.duration.FiniteDuration; +import scala.reflect.ClassTag$; + +/** + * This implementation uses {@link ActorGateway} to trigger the partition state check at the job + * manager. + */ +public class ActorGatewayPartitionProducerStateChecker implements PartitionProducerStateChecker { + + private final ActorGateway jobManager; + private final FiniteDuration timeout; + + public ActorGatewayPartitionProducerStateChecker(ActorGateway jobManager, FiniteDuration timeout) { + this.jobManager = Preconditions.checkNotNull(jobManager); + this.timeout = Preconditions.checkNotNull(timeout); + } + + @Override + public Future<ExecutionState> requestPartitionProducerState( + JobID jobId, + IntermediateDataSetID intermediateDataSetId, + ResultPartitionID resultPartitionId) { + + JobManagerMessages.RequestPartitionProducerState msg = new JobManagerMessages.RequestPartitionProducerState( + jobId, + intermediateDataSetId, resultPartitionId + ); + + scala.concurrent.Future<ExecutionState> futureResponse = jobManager + .ask(msg, timeout) + .mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class)); + + return new FlinkFuture<>(futureResponse); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java deleted file mode 100644 index efa6ec3..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkFuture; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.io.network.PartitionState; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.util.Preconditions; -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; - -/** - * This implementation uses {@link ActorGateway} to trigger the partition state check at the job - * manager. - */ -public class ActorGatewayPartitionStateChecker implements PartitionStateChecker { - - private final ActorGateway jobManager; - private final FiniteDuration timeout; - - public ActorGatewayPartitionStateChecker(ActorGateway jobManager, FiniteDuration timeout) { - this.jobManager = Preconditions.checkNotNull(jobManager); - this.timeout = Preconditions.checkNotNull(timeout); - } - - @Override - public Future<PartitionState> requestPartitionState( - JobID jobId, - ExecutionAttemptID executionAttemptId, - IntermediateDataSetID resultId, - ResultPartitionID partitionId) { - JobManagerMessages.RequestPartitionState msg = new JobManagerMessages.RequestPartitionState( - jobId, - partitionId, - executionAttemptId, - resultId); - - scala.concurrent.Future<PartitionState> futureResponse = jobManager - .ask(msg, timeout) - .mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class)); - - return new FlinkFuture<>(futureResponse); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 184c3b1..a1fb35e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -46,20 +46,19 @@ import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.PartitionState; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -206,8 +205,8 @@ public class Task implements Runnable, TaskActions { /** Parent group for all metrics of this task */ private final TaskMetricGroup metrics; - /** Partition state checker to request partition states from */ - private final PartitionStateChecker partitionStateChecker; + /** Partition producer state checker to request partition states from */ + private final PartitionProducerStateChecker partitionProducerStateChecker; /** Executor to run future callbacks */ private final Executor executor; @@ -271,7 +270,7 @@ public class Task implements Runnable, TaskActions { TaskManagerRuntimeInfo taskManagerConfig, TaskMetricGroup metricGroup, ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, - PartitionStateChecker partitionStateChecker, + PartitionProducerStateChecker partitionProducerStateChecker, Executor executor) { Preconditions.checkNotNull(jobInformation); @@ -321,7 +320,7 @@ public class Task implements Runnable, TaskActions { this.taskExecutionStateListeners = new CopyOnWriteArrayList<>(); this.metrics = metricGroup; - this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker); + this.partitionProducerStateChecker = Preconditions.checkNotNull(partitionProducerStateChecker); this.executor = Preconditions.checkNotNull(executor); // create the reader and writer structures @@ -1036,32 +1035,37 @@ public class Task implements Runnable, TaskActions { // ------------------------------------------------------------------------ @Override - public void triggerPartitionStateCheck( + public void triggerPartitionProducerStateCheck( JobID jobId, - ExecutionAttemptID executionId, - final IntermediateDataSetID resultId, - final ResultPartitionID partitionId) { - org.apache.flink.runtime.concurrent.Future<PartitionState> futurePartitionState = partitionStateChecker.requestPartitionState( - jobId, - executionId, - resultId, - partitionId); - - futurePartitionState.handleAsync(new BiFunction<PartitionState, Throwable, Void>() { + final IntermediateDataSetID intermediateDataSetId, + final ResultPartitionID resultPartitionId) { + + org.apache.flink.runtime.concurrent.Future<ExecutionState> futurePartitionState = + partitionProducerStateChecker.requestPartitionProducerState( + jobId, + intermediateDataSetId, + resultPartitionId); + + futurePartitionState.handleAsync(new BiFunction<ExecutionState, Throwable, Void>() { @Override - public Void apply(PartitionState partitionState, Throwable throwable) { + public Void apply(ExecutionState executionState, Throwable throwable) { try { - if (partitionState != null) { + if (executionState != null) { onPartitionStateUpdate( - partitionState.getIntermediateDataSetID(), - partitionState.getIntermediateResultPartitionID(), - partitionState.getExecutionState()); + intermediateDataSetId, + resultPartitionId, + executionState); } else if (throwable instanceof TimeoutException) { // our request timed out, assume we're still running and try again onPartitionStateUpdate( - resultId, - partitionId.getPartitionId(), + intermediateDataSetId, + resultPartitionId, ExecutionState.RUNNING); + } else if (throwable instanceof PartitionProducerDisposedException) { + String msg = String.format("Producer {} of partition {} disposed. Cancelling execution.", + resultPartitionId.getProducerId(), resultPartitionId.getPartitionId()); + LOG.info(msg, throwable); + cancelExecution(); } else { failExternally(throwable); } @@ -1183,41 +1187,58 @@ public class Task implements Runnable, TaskActions { /** * Answer to a partition state check issued after a failed partition request. */ - public void onPartitionStateUpdate( - IntermediateDataSetID resultId, - IntermediateResultPartitionID partitionId, - ExecutionState partitionState) throws IOException, InterruptedException { + @VisibleForTesting + void onPartitionStateUpdate( + IntermediateDataSetID intermediateDataSetId, + ResultPartitionID resultPartitionId, + ExecutionState producerState) throws IOException, InterruptedException { if (executionState == ExecutionState.RUNNING) { - final SingleInputGate inputGate = inputGatesById.get(resultId); + final SingleInputGate inputGate = inputGatesById.get(intermediateDataSetId); if (inputGate != null) { - if (partitionState == ExecutionState.RUNNING || - partitionState == ExecutionState.FINISHED || - partitionState == ExecutionState.SCHEDULED || - partitionState == ExecutionState.DEPLOYING) { + if (producerState == ExecutionState.SCHEDULED + || producerState == ExecutionState.DEPLOYING + || producerState == ExecutionState.RUNNING + || producerState == ExecutionState.FINISHED) { // Retrigger the partition request - inputGate.retriggerPartitionRequest(partitionId); - } - else if (partitionState == ExecutionState.CANCELED - || partitionState == ExecutionState.CANCELING - || partitionState == ExecutionState.FAILED) { + inputGate.retriggerPartitionRequest(resultPartitionId.getPartitionId()); + + } else if (producerState == ExecutionState.CANCELING + || producerState == ExecutionState.CANCELED + || producerState == ExecutionState.FAILED) { + + // The producing execution has been canceled or failed. We + // don't need to re-trigger the request since it cannot + // succeed. + if (LOG.isDebugEnabled()) { + LOG.debug("Cancelling task {} after the producer of partition {} with attempt ID {} has entered state {}.", + taskNameWithSubtask, + resultPartitionId.getPartitionId(), + resultPartitionId.getProducerId(), + producerState); + } cancelExecution(); + } else { + // Any other execution state is unexpected. Currently, only + // state CREATED is left out of the checked states. If we + // see a producer in this state, something went wrong with + // scheduling in topological order. + String msg = String.format("Producer with attempt ID %s of partition %s in unexpected state %s.", + resultPartitionId.getProducerId(), + resultPartitionId.getPartitionId(), + producerState); + + failExternally(new IllegalStateException(msg)); } - else { - failExternally(new IllegalStateException("Received unexpected partition state " - + partitionState + " for partition request. This is a bug.")); - } - } - else { - failExternally(new IllegalStateException("Received partition state for " + - "unknown input gate " + resultId + ". This is a bug.")); + } else { + failExternally(new IllegalStateException("Received partition producer state for " + + "unknown input gate " + intermediateDataSetId + ".")); } - } - else { - LOG.debug("Ignoring partition state notification for not running task."); + } else { + LOG.debug("Task {} ignored a partition producer state notification, because it's not running.", taskNameWithSubtask); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java index 4f12691..f7650d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -29,21 +28,20 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; public interface TaskActions { /** - * Check the partition state of the given partition. + * Check the execution state of the execution producing a result partition. * - * @param jobId of the partition - * @param executionId of the partition - * @param resultId of the partition - * @param partitionId of the partition + * @param jobId ID of the job the partition belongs to. + * @param intermediateDataSetId ID of the parent intermediate data set. + * @param resultPartitionId ID of the result partition to check. This + * identifies the producing execution and partition. */ - void triggerPartitionStateCheck( + void triggerPartitionProducerStateCheck( JobID jobId, - ExecutionAttemptID executionId, - IntermediateDataSetID resultId, - ResultPartitionID partitionId); + IntermediateDataSetID intermediateDataSetId, + ResultPartitionID resultPartitionId); /** - * Fail the owning task with the given throwawble. + * Fail the owning task with the given throwable. * * @param cause of the failure */ http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 1dfd3db..8c686cd 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -50,8 +50,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory import org.apache.flink.runtime.executiongraph._ import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager} -import org.apache.flink.runtime.io.network.PartitionState -import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus} +import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway @@ -78,7 +77,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} -import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} +import org.apache.flink.runtime.{FlinkActor, JobException, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils} import org.jboss.netty.channel.ChannelException @@ -935,27 +934,58 @@ class JobManager( ) } - case RequestPartitionState(jobId, partitionId, taskExecutionId, taskResultId) => - val state = currentJobs.get(jobId) match { + case RequestPartitionProducerState(jobId, intermediateDataSetId, resultPartitionId) => + currentJobs.get(jobId) match { case Some((executionGraph, _)) => - val execution = executionGraph.getRegisteredExecutions.get(partitionId.getProducerId) + try { + // Find the execution attempt producing the intermediate result partition. + val execution = executionGraph + .getRegisteredExecutions + .get(resultPartitionId.getProducerId) + + if (execution != null) { + // Common case for pipelined exchanges => producing execution is + // still active. + sender ! decorateMessage(execution.getState) + } else { + // The producing execution might have terminated and been + // unregistered. We now look for the producing execution via the + // intermediate result itself. + val intermediateResult = executionGraph + .getAllIntermediateResults.get(intermediateDataSetId) + + if (intermediateResult != null) { + // Try to find the producing execution + val producerExecution = intermediateResult + .getPartitionById(resultPartitionId.getPartitionId) + .getProducer + .getCurrentExecutionAttempt + + if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) { + sender ! decorateMessage(producerExecution.getState) + } else { + val cause = new PartitionProducerDisposedException(resultPartitionId) + sender ! decorateMessage(Status.Failure(cause)) + } + } else { + val cause = new IllegalArgumentException( + s"Intermediate data set with ID $intermediateDataSetId not found.") + sender ! decorateMessage(Status.Failure(cause)) + } + } + } catch { + case e: Exception => + sender ! decorateMessage( + Status.Failure(new RuntimeException("Failed to look up execution state of " + + s"producer with ID ${resultPartitionId.getProducerId}.", e))) + } - if (execution != null) execution.getState else null case None => - // Nothing to do. This is not an error, because the request is received when a sending - // task fails or is not yet available during a remote partition request. - log.debug(s"Cannot find execution graph for job $jobId.") + sender ! decorateMessage( + Status.Failure(new IllegalArgumentException(s"Job with ID $jobId not found."))) - null } - sender ! decorateMessage( - new PartitionState( - taskResultId, - partitionId.getPartitionId, - state) - ) - case RequestJobStatus(jobID) => currentJobs.get(jobID) match { case Some((executionGraph,_)) => @@ -1059,7 +1089,7 @@ class JobManager( taskManagerMap.get(taskManagerActorRef) match { case Some(instanceId) => handleTaskManagerTerminated(taskManagerActorRef, instanceId) case None => log.debug("Received terminated message for task manager " + - s"${taskManagerActorRef} which is not " + + s"$taskManagerActorRef which is not " + "connected to this job manager.") } @@ -2092,7 +2122,7 @@ object JobManager { def sleepBeforeRetry() : Unit = { if (maxSleepBetweenRetries > 0) { val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long] - LOG.info(s"Retrying after bind exception. Sleeping for ${sleepTime} ms.") + LOG.info(s"Retrying after bind exception. Sleeping for $sleepTime ms.") Thread.sleep(sleepTime) } } http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 3d72f1a..65819f4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -157,20 +157,18 @@ object JobManagerMessages { case class NextInputSplit(splitData: Array[Byte]) /** - * Requests the current state of the partition. - * - * The state of a partition is currently bound to the state of the producing execution. - * - * @param jobId The job ID of the job, which produces the partition. - * @param partitionId The partition ID of the partition to request the state of. - * @param taskExecutionId The execution attempt ID of the task requesting the partition state. - * @param taskResultId The input gate ID of the task requesting the partition state. - */ - case class RequestPartitionState( + * Requests the execution state of the execution producing a result partition. + * + * @param jobId ID of the job the partition belongs to. + * @param intermediateDataSetId ID of the parent intermediate data set. + * @param resultPartitionId ID of the result partition to check. This + * identifies the producing execution and + * partition. + */ + case class RequestPartitionProducerState( jobId: JobID, - partitionId: ResultPartitionID, - taskExecutionId: ExecutionAttemptID, - taskResultId: IntermediateDataSetID) + intermediateDataSetId: IntermediateDataSetID, + resultPartitionId: ResultPartitionID) extends RequiresLeaderSessionID /** http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 271578f..41d3077 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -49,7 +49,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync} import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool import org.apache.flink.runtime.io.network.{LocalConnectionManager, NetworkEnvironment, TaskEventDispatcher} -import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager, PartitionStateChecker} +import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager, PartitionProducerStateChecker} import org.apache.flink.runtime.io.network.partition.{ResultPartitionConsumableNotifier, ResultPartitionManager} import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService} import org.apache.flink.runtime.memory.MemoryManager @@ -181,7 +181,7 @@ class TaskManager( private var connectionUtils: Option[( CheckpointResponder, - PartitionStateChecker, + PartitionProducerStateChecker, ResultPartitionConsumableNotifier, TaskManagerConnection)] = None @@ -916,7 +916,7 @@ class TaskManager( val taskManagerConnection = new ActorGatewayTaskManagerConnection(taskManagerGateway) - val partitionStateChecker = new ActorGatewayPartitionStateChecker( + val partitionStateChecker = new ActorGatewayPartitionProducerStateChecker( jobManagerGateway, config.timeout) http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java index 6570679..8cae04c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; @@ -60,7 +59,6 @@ public class InputGateConcurrentTest { final SingleInputGate gate = new SingleInputGate( "Test Task Name", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), @@ -96,7 +94,6 @@ public class InputGateConcurrentTest { final SingleInputGate gate = new SingleInputGate( "Test Task Name", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, @@ -146,7 +143,6 @@ public class InputGateConcurrentTest { final SingleInputGate gate = new SingleInputGate( "Test Task Name", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java index b35612a..7e1d792 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; @@ -87,7 +86,6 @@ public class InputGateFairnessTest { SingleInputGate gate = new FairnessVerifyingInputGate( "Test Task Name", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), @@ -141,7 +139,6 @@ public class InputGateFairnessTest { SingleInputGate gate = new FairnessVerifyingInputGate( "Test Task Name", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), @@ -192,7 +189,6 @@ public class InputGateFairnessTest { SingleInputGate gate = new FairnessVerifyingInputGate( "Test Task Name", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), @@ -248,7 +244,6 @@ public class InputGateFairnessTest { SingleInputGate gate = new FairnessVerifyingInputGate( "Test Task Name", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numChannels, mock(TaskActions.class), @@ -346,14 +341,13 @@ public class InputGateFairnessTest { public FairnessVerifyingInputGate( String owningTaskName, JobID jobId, - ExecutionAttemptID executionId, IntermediateDataSetID consumedResultId, int consumedSubpartitionIndex, int numberOfInputChannels, TaskActions taskActions, TaskIOMetricGroup metrics) { - super(owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, + super(owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex, numberOfInputChannels, taskActions, metrics); try { http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 35ed4c3..37ec751 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.Lists; import org.apache.flink.api.common.JobID; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.execution.CancelTaskException; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -283,7 +282,6 @@ public class LocalInputChannelTest { final SingleInputGate gate = new SingleInputGate( "test task name", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 1, @@ -481,7 +479,6 @@ public class LocalInputChannelTest { this.inputGate = new SingleInputGate( "Test Name", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), subpartitionIndex, numberOfInputChannels, http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 7cae362..a25b8d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -73,7 +73,7 @@ public class SingleInputGateTest { public void testBasicGetNextLogic() throws Exception { // Setup final SingleInputGate inputGate = new SingleInputGate( - "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + "Test Task Name", new JobID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); final TestInputChannel[] inputChannels = new TestInputChannel[]{ new TestInputChannel(inputGate, 0), @@ -127,7 +127,7 @@ public class SingleInputGateTest { // Setup reader with one local and one unknown input channel final IntermediateDataSetID resultId = new IntermediateDataSetID(); - final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); final BufferPool bufferPool = mock(BufferPool.class); when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2); @@ -178,7 +178,6 @@ public class SingleInputGateTest { SingleInputGate inputGate = new SingleInputGate( "t1", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 1, @@ -218,7 +217,6 @@ public class SingleInputGateTest { final SingleInputGate inputGate = new SingleInputGate( "InputGate", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 1, http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index 126a96e..fe3b087 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; @@ -55,7 +54,6 @@ public class TestSingleInputGate { SingleInputGate realGate = new SingleInputGate( "Test Task Name", new JobID(), - new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index 84ec202..c05df0a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.runtime.taskmanager.TaskActions; @@ -43,8 +42,8 @@ public class UnionInputGateTest { public void testBasicGetNextLogic() throws Exception { // Setup final String testTaskName = "Test Task"; - final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); - final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2}); http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index f941c24..6d8c70b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -38,7 +38,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.io.network.PartitionState; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -56,7 +55,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; +import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState; import org.apache.flink.runtime.messages.JobManagerMessages.StopJob; import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure; import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess; @@ -116,7 +115,6 @@ import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingClu import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -221,49 +219,227 @@ public class JobManagerTest { // - The test ---------------------------------------------------------------------- // 1. All execution states - RequestPartitionState request = new RequestPartitionState( - jid, partitionId, receiver, rid); + RequestPartitionProducerState request = new RequestPartitionProducerState( + jid, rid, partitionId); for (ExecutionState state : ExecutionState.values()) { ExecutionGraphTestUtils.setVertexState(vertex, state); - Future<PartitionState> futurePartitionState = jobManagerGateway + Future<ExecutionState> futurePartitionState = jobManagerGateway .ask(request, getRemainingTime()) - .mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class)); + .mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class)); - PartitionState resp = Await.result(futurePartitionState, getRemainingTime()); - - assertEquals(request.taskResultId(), resp.getIntermediateDataSetID()); - assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID()); - assertEquals(state, resp.getExecutionState()); + ExecutionState resp = Await.result(futurePartitionState, getRemainingTime()); + assertEquals(state, resp); } // 2. Non-existing execution - request = new RequestPartitionState(jid, new ResultPartitionID(), receiver, rid); + request = new RequestPartitionProducerState(jid, rid, new ResultPartitionID()); + + Future<?> futurePartitionState = jobManagerGateway.ask(request, getRemainingTime()); + try { + Await.result(futurePartitionState, getRemainingTime()); + fail("Did not fail with expected RuntimeException"); + } catch (RuntimeException e) { + assertEquals(IllegalArgumentException.class, e.getCause().getClass()); + } - Future<PartitionState> futurePartitionState = jobManagerGateway - .ask(request, getRemainingTime()) - .mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class)); + // 3. Non-existing job + request = new RequestPartitionProducerState(new JobID(), rid, new ResultPartitionID()); + futurePartitionState = jobManagerGateway.ask(request, getRemainingTime()); + + try { + Await.result(futurePartitionState, getRemainingTime()); + fail("Did not fail with expected IllegalArgumentException"); + } catch (IllegalArgumentException ignored) { + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + }; + }}; + } - PartitionState resp = Await.result(futurePartitionState, getRemainingTime()); + /** + * Tests the JobManager response when the execution is not registered with + * the ExecutionGraph. + */ + @Test + public void testRequestPartitionStateUnregisteredExecution() throws Exception { + new JavaTestKit(system) {{ + new Within(duration("15 seconds")) { + @Override + protected void run() { + // Setup + TestingCluster cluster = null; - assertEquals(request.taskResultId(), resp.getIntermediateDataSetID()); - assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID()); - assertNull(resp.getExecutionState()); + try { + cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT()); - // 3. Non-existing job - request = new RequestPartitionState( - new JobID(), new ResultPartitionID(), receiver, rid); + final IntermediateDataSetID rid = new IntermediateDataSetID(); + + // Create a task + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(1); + sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish + sender.createAndAddResultDataSet(rid, PIPELINED); + + final JobVertex sender2 = new JobVertex("Blocking Sender"); + sender2.setParallelism(1); + sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block + sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED); + + final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2); + final JobID jid = jobGraph.getJobID(); + + final ActorGateway jobManagerGateway = cluster.getLeaderGateway( + TestingUtils.TESTING_DURATION()); + + // we can set the leader session ID to None because we don't use this gateway to send messages + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + + // Submit the job and wait for all vertices to be running + jobManagerGateway.tell( + new SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT), + testActorGateway); + expectMsgClass(JobSubmitSuccess.class); + + jobManagerGateway.tell( + new WaitForAllVerticesToBeRunningOrFinished(jid), + testActorGateway); + + expectMsgClass(AllVerticesRunning.class); + + Future<Object> egFuture = jobManagerGateway.ask( + new RequestExecutionGraph(jobGraph.getJobID()), remaining()); + + ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining()); + ExecutionGraph eg = (ExecutionGraph) egFound.executionGraph(); + + ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0]; + while (vertex.getExecutionState() != ExecutionState.FINISHED) { + Thread.sleep(1); + } + + IntermediateResultPartition partition = vertex.getProducedPartitions() + .values().iterator().next(); + + ResultPartitionID partitionId = new ResultPartitionID( + partition.getPartitionId(), + vertex.getCurrentExecutionAttempt().getAttemptId()); - futurePartitionState = jobManagerGateway + // Producer finished, request state + Object request = new RequestPartitionProducerState(jid, rid, partitionId); + + Future<ExecutionState> producerStateFuture = jobManagerGateway .ask(request, getRemainingTime()) - .mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class)); + .mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class)); + + assertEquals(ExecutionState.FINISHED, Await.result(producerStateFuture, getRemainingTime())); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + }; + }}; + } + + /** + * Tests the JobManager response when the execution is not registered with + * the ExecutionGraph anymore and a new execution attempt is available. + */ + @Test + public void testRequestPartitionStateMoreRecentExecutionAttempt() throws Exception { + new JavaTestKit(system) {{ + new Within(duration("15 seconds")) { + @Override + protected void run() { + // Setup + TestingCluster cluster = null; + + try { + cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT()); + + final IntermediateDataSetID rid = new IntermediateDataSetID(); + + // Create a task + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(1); + sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish + sender.createAndAddResultDataSet(rid, PIPELINED); - resp = Await.result(futurePartitionState, getRemainingTime()); + final JobVertex sender2 = new JobVertex("Blocking Sender"); + sender2.setParallelism(1); + sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block + sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED); - assertEquals(request.taskResultId(), resp.getIntermediateDataSetID()); - assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID()); - assertNull(resp.getExecutionState()); + final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2); + final JobID jid = jobGraph.getJobID(); + + final ActorGateway jobManagerGateway = cluster.getLeaderGateway( + TestingUtils.TESTING_DURATION()); + + // we can set the leader session ID to None because we don't use this gateway to send messages + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + + // Submit the job and wait for all vertices to be running + jobManagerGateway.tell( + new SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT), + testActorGateway); + expectMsgClass(JobManagerMessages.JobSubmitSuccess.class); + + jobManagerGateway.tell( + new WaitForAllVerticesToBeRunningOrFinished(jid), + testActorGateway); + + expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class); + + Future<Object> egFuture = jobManagerGateway.ask( + new RequestExecutionGraph(jobGraph.getJobID()), remaining()); + + ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining()); + ExecutionGraph eg = (ExecutionGraph) egFound.executionGraph(); + + ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0]; + while (vertex.getExecutionState() != ExecutionState.FINISHED) { + Thread.sleep(1); + } + + IntermediateResultPartition partition = vertex.getProducedPartitions() + .values().iterator().next(); + + ResultPartitionID partitionId = new ResultPartitionID( + partition.getPartitionId(), + vertex.getCurrentExecutionAttempt().getAttemptId()); + + // Reset execution => new execution attempt + vertex.resetForNewExecution(); + + // Producer finished, request state + Object request = new JobManagerMessages.RequestPartitionProducerState(jid, rid, partitionId); + + Future<?> producerStateFuture = jobManagerGateway.ask(request, getRemainingTime()); + + try { + Await.result(producerStateFuture, getRemainingTime()); + fail("Did not fail with expected Exception"); + } catch (PartitionProducerDisposedException ignored) { + } } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index e37467b..a7ffa1a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -145,7 +145,7 @@ public class TaskAsyncCallTest { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); - PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class); + PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class); Executor executor = mock(Executor.class); NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager); @@ -191,7 +191,7 @@ public class TaskAsyncCallTest { new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), mock(TaskMetricGroup.class), consumableNotifier, - partitionStateChecker, + partitionProducerStateChecker, executor); } http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index fd9ff05..5ccf8a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -49,7 +49,6 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.network.ConnectionID; -import org.apache.flink.runtime.io.network.PartitionState; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -102,7 +101,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; -import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; +import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState; import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -1579,15 +1578,8 @@ public class TaskManagerTest extends TestLogger { @Override public void handleMessage(Object message) throws Exception { - if (message instanceof RequestPartitionState) { - final RequestPartitionState msg = (RequestPartitionState) message; - - PartitionState resp = new PartitionState( - msg.taskResultId(), - msg.partitionId().getPartitionId(), - ExecutionState.RUNNING); - - getSender().tell(decorateMessage(resp), getSelf()); + if (message instanceof RequestPartitionProducerState) { + getSender().tell(decorateMessage(ExecutionState.RUNNING), getSelf()); } else if (message instanceof TaskMessages.UpdateTaskExecutionState) { final TaskExecutionState msg = ((TaskMessages.UpdateTaskExecutionState) message) http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index d80dab3..ae7d0b8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; @@ -95,7 +95,7 @@ public class TaskStopTest { tmRuntimeInfo, mock(TaskMetricGroup.class), mock(ResultPartitionConsumableNotifier.class), - mock(PartitionStateChecker.class), + mock(PartitionProducerStateChecker.class), mock(Executor.class)); Field f = task.getClass().getDeclaredField("invokable"); f.setAccessible(true);
