[FLINK-1415] [runtime] Clean up archiving of ExecutionGraphs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ae0dc2d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ae0dc2d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ae0dc2d Branch: refs/heads/master Commit: 8ae0dc2d768aecfa3129df553f43d827792b65d7 Parents: db1b8b9 Author: Stephan Ewen <[email protected]> Authored: Wed Feb 4 13:40:13 2015 +0100 Committer: Stephan Ewen <[email protected]> Committed: Thu Feb 5 12:17:15 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 35 ++++++++--- .../runtime/executiongraph/ExecutionGraph.java | 51 ++++++++++++---- .../executiongraph/ExecutionJobVertex.java | 41 +++++++++++-- .../runtime/executiongraph/ExecutionVertex.java | 50 ++++++++++++---- .../apache/flink/runtime/instance/Instance.java | 62 ++++++++++---------- .../flink/runtime/instance/SharedSlot.java | 4 +- .../flink/runtime/instance/SimpleSlot.java | 6 +- .../org/apache/flink/runtime/instance/Slot.java | 32 ++++++---- .../scheduler/CoLocationConstraint.java | 6 +- .../scheduler/NoResourceAvailableException.java | 9 ++- .../runtime/jobmanager/scheduler/Scheduler.java | 1 - .../scheduler/SlotAvailabilityListener.java | 4 +- .../jobmanager/web/JobManagerInfoServlet.java | 27 ++++----- .../runtime/jobmanager/web/JsonFactory.java | 7 +-- .../runtime/jobmanager/web/WebInfoServer.java | 6 -- .../flink/runtime/jobmanager/JobManager.scala | 5 +- .../runtime/jobmanager/MemoryArchivist.scala | 14 ++--- .../runtime/messages/ArchiveMessages.scala | 9 +++ .../runtime/messages/JobmanagerMessages.scala | 31 ++++++++++ .../test/javaApiOperators/DataSinkITCase.java | 2 +- 20 files changed, 275 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index e1a24c4..27977c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -22,12 +22,14 @@ import akka.actor.ActorRef; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; + import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.PartitionInfo; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.AllocatedSlot; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; @@ -40,6 +42,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; + import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -78,7 +81,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED; */ public class Execution implements Serializable { - static final long serialVersionUID = 42L; + private static final long serialVersionUID = 42L; private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state"); @@ -97,22 +100,25 @@ public class Execution implements Serializable { private final int attemptNumber; - public FiniteDuration timeout; + private final FiniteDuration timeout; private volatile ExecutionState state = CREATED; - private volatile SimpleSlot assignedResource; // once assigned, never changes + private volatile SimpleSlot assignedResource; // once assigned, never changes until the execution is archived private volatile Throwable failureCause; // once assigned, never changes + private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution + // -------------------------------------------------------------------------------------------- - public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, - FiniteDuration timeout) { + public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) { + checkArgument(attemptNumber >= 0); + this.vertex = checkNotNull(vertex); this.attemptId = new ExecutionAttemptID(); - checkArgument(attemptNumber >= 0); + this.attemptNumber = attemptNumber; this.stateTimestamps = new long[ExecutionState.values().length]; @@ -145,6 +151,10 @@ public class Execution implements Serializable { return assignedResource; } + public InstanceConnectionInfo getAssignedResourceLocation() { + return assignedResourceLocation; + } + public Throwable getFailureCause() { return failureCause; } @@ -161,6 +171,16 @@ public class Execution implements Serializable { return state == FINISHED || state == FAILED || state == CANCELED; } + /** + * This method cleans fields that are irrelevant for the archived execution attempt. + */ + public void prepareForArchiving() { + if (assignedResource != null && assignedResource.isAlive()) { + throw new IllegalStateException("Cannot archive Execution while the assigned resource is still running."); + } + assignedResource = null; + } + // -------------------------------------------------------------------------------------------- // Actions // -------------------------------------------------------------------------------------------- @@ -267,6 +287,7 @@ public class Execution implements Serializable { throw new JobException("Could not assign the ExecutionVertex to the slot " + slot); } this.assignedResource = slot; + this.assignedResourceLocation = slot.getInstance().getInstanceConnectionInfo(); // race double check, did we fail/cancel and do we need to release the slot? if (this.state != DEPLOYING) { http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 54c5d3a..3f857e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import akka.actor.ActorRef; + import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; @@ -36,6 +37,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; @@ -54,7 +56,7 @@ import static akka.dispatch.Futures.future; public class ExecutionGraph implements Serializable { - static final long serialVersionUID = 42L; + private static final long serialVersionUID = 42L; private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state"); @@ -71,10 +73,10 @@ public class ExecutionGraph implements Serializable { private final String jobName; /** The job configuration that was originally attached to the JobGraph. */ - private transient final Configuration jobConfiguration; + private final Configuration jobConfiguration; /** The classloader of the user code. */ - private final ClassLoader userClassLoader; + private ClassLoader userClassLoader; /** All job vertices that are part of this graph */ private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks; @@ -83,20 +85,20 @@ public class ExecutionGraph implements Serializable { private final List<ExecutionJobVertex> verticesInCreationOrder; /** All intermediate results that are part of this graph */ - private transient final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults; + private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults; /** The currently executed tasks, for callbacks */ - private transient final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions; + private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions; - private transient final List<BlobKey> requiredJarFiles; + private final List<BlobKey> requiredJarFiles; - private transient final List<ActorRef> jobStatusListenerActors; + private final List<ActorRef> jobStatusListenerActors; - private transient final List<ActorRef> executionListenerActors; + private final List<ActorRef> executionListenerActors; private final long[] stateTimestamps; - private transient final Object progressLock = new Object(); + private final Object progressLock = new Object(); private int nextVertexToFinish; @@ -110,12 +112,13 @@ public class ExecutionGraph implements Serializable { private volatile Throwable failureCause; - private transient Scheduler scheduler; + private Scheduler scheduler; private boolean allowQueuedScheduling = true; private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; + public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) { this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>()); } @@ -598,8 +601,11 @@ public class ExecutionGraph implements Serializable { public void restart() { try { if (state == JobStatus.FAILED) { - transitionState(JobStatus.FAILED, JobStatus.RESTARTING); + if (!transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) { + throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart."); + } } + synchronized (progressLock) { if (state != JobStatus.RESTARTING) { throw new IllegalStateException("Can only restart job from state restarting."); @@ -627,4 +633,27 @@ public class ExecutionGraph implements Serializable { fail(t); } } + + /** + * This method cleans fields that are irrelevant for the archived execution attempt. + */ + public void prepareForArchiving() { + if (!state.isTerminalState()) { + throw new IllegalStateException("Can only archive the job from a terminal state"); + } + + userClassLoader = null; + + for (ExecutionJobVertex vertex : verticesInCreationOrder) { + vertex.prepareForArchiving(); + } + + intermediateResults.clear(); + currentExecutions.clear(); + requiredJarFiles.clear(); + jobStatusListenerActors.clear(); + executionListenerActors.clear(); + + scheduler = null; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 7eaa1e6..e2febc7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -42,12 +42,13 @@ import scala.concurrent.duration.FiniteDuration; public class ExecutionJobVertex implements Serializable { - static final long serialVersionUID = 42L; + + private static final long serialVersionUID = 42L; /** Use the same log for all ExecutionGraph classes */ private static final Logger LOG = ExecutionGraph.LOG; - private transient final Object stateMonitor = new Object(); + private final Object stateMonitor = new Object(); private final ExecutionGraph graph; @@ -55,9 +56,9 @@ public class ExecutionJobVertex implements Serializable { private final ExecutionVertex[] taskVertices; - private transient final IntermediateResult[] producedDataSets; + private IntermediateResult[] producedDataSets; - private transient final List<IntermediateResult> inputs; + private final List<IntermediateResult> inputs; private final int parallelism; @@ -71,7 +72,7 @@ public class ExecutionJobVertex implements Serializable { private final InputSplit[] inputSplits; - private transient InputSplitAssigner splitAssigner; + private InputSplitAssigner splitAssigner; public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex, @@ -308,6 +309,36 @@ public class ExecutionJobVertex implements Serializable { } } + /** + * This method cleans fields that are irrelevant for the archived execution attempt. + */ + public void prepareForArchiving() { + + for (ExecutionVertex vertex : taskVertices) { + vertex.prepareForArchiving(); + } + + // clear intermediate results + inputs.clear(); + producedDataSets = null; + + // reset shared groups + if (slotSharingGroup != null) { + slotSharingGroup.clearTaskAssignment(); + } + if (coLocationGroup != null) { + coLocationGroup.resetConstraints(); + } + + // reset splits and split assigner + splitAssigner = null; + if (inputSplits != null) { + for (int i = 0; i < inputSplits.length; i++) { + inputSplits[i] = null; + } + } + } + //--------------------------------------------------------------------------------------------- // Notifications //--------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index d3993bb..1092f89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; @@ -37,6 +38,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.slf4j.Logger; + import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; @@ -56,9 +58,8 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; */ public class ExecutionVertex implements Serializable { - static final long serialVersionUID = 42L; + private static final long serialVersionUID = 42L; - @SuppressWarnings("unused") private static final Logger LOG = ExecutionGraph.LOG; private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8; @@ -67,9 +68,9 @@ public class ExecutionVertex implements Serializable { private final ExecutionJobVertex jobVertex; - private transient final IntermediateResultPartition[] resultPartitions; + private IntermediateResultPartition[] resultPartitions; - private transient ExecutionEdge[][] inputEdges; + private ExecutionEdge[][] inputEdges; private final int subTaskIndex; @@ -182,6 +183,10 @@ public class ExecutionVertex implements Serializable { return currentExecution.getAssignedResource(); } + public InstanceConnectionInfo getCurrentAssignedResourceLocation() { + return currentExecution.getAssignedResourceLocation(); + } + public ExecutionGraph getExecutionGraph() { return this.jobVertex.getGraph(); } @@ -322,6 +327,11 @@ public class ExecutionVertex implements Serializable { // -------------------------------------------------------------------------------------------- public void resetForNewExecution() { + + if (LOG.isDebugEnabled()) { + LOG.debug("Resetting exection vertex {} for new execution.", getSimpleName()); + } + synchronized (priorExecutions) { Execution execution = currentExecution; ExecutionState state = execution.getState(); @@ -369,6 +379,31 @@ public class ExecutionVertex implements Serializable { return currentExecution.scheduleOrUpdateConsumers(partition.getConsumers()); } + + /** + * This method cleans fields that are irrelevant for the archived execution attempt. + */ + public void prepareForArchiving() { + Execution execution = currentExecution; + ExecutionState state = execution.getState(); + + // sanity check + if (!(state == FINISHED || state == CANCELED || state == FAILED)) { + throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state."); + } + + // prepare the current execution for archiving + execution.prepareForArchiving(); + + // prepare previous executions for archiving + for (Execution exec : priorExecutions) { + exec.prepareForArchiving(); + } + + // clear the unnecessary fields in this class + this.resultPartitions = null; + this.inputEdges = null; + } // -------------------------------------------------------------------------------------------- // Notifications from the Execution Attempt @@ -447,13 +482,6 @@ public class ExecutionVertex implements Serializable { return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')'; } - /* - * Clears all Edges of this ExecutionVertex - */ - public void clearExecutionEdges() { - inputEdges = null; - } - @Override public String toString() { return getSimpleName(); http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index 4f9dc7f..a5a9263 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.instance; -import java.io.Serializable; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashSet; @@ -33,17 +32,16 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment; /** - * An taskManager represents a resource a {@link org.apache.flink.runtime.taskmanager.TaskManager} runs on. + * An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager} + * registered at a JobManager and ready to receive work. */ -public class Instance implements Serializable { - - static final long serialVersionUID = 42L; +public class Instance { /** The lock on which to synchronize allocations and failure state changes */ - private transient final Object instanceLock = new Object(); + private final Object instanceLock = new Object(); /** The actor ref to the task manager represented by this taskManager. */ - private transient final ActorRef taskManager; + private final ActorRef taskManager; /** The instance connection information for the data transfer. */ private final InstanceConnectionInfo connectionInfo; @@ -58,28 +56,27 @@ public class Instance implements Serializable { private final int numberOfSlots; /** A list of available slot positions */ - private transient final Queue<Integer> availableSlots; + private final Queue<Integer> availableSlots; /** Allocated slots on this taskManager */ private final Set<Slot> allocatedSlots = new HashSet<Slot>(); - /** A listener to be notified upon new slot availability */ - private transient SlotAvailabilityListener slotAvailabilityListener; + private SlotAvailabilityListener slotAvailabilityListener; - /** - * Time when last heat beat has been received from the task manager running on this taskManager. - */ + /** Time when last heat beat has been received from the task manager running on this taskManager. */ private volatile long lastReceivedHeartBeat = System.currentTimeMillis(); + /** Flag marking the instance as alive or as dead. */ private volatile boolean isDead; // -------------------------------------------------------------------------------------------- /** - * Constructs an abstract taskManager object. + * Constructs an instance reflecting a registered TaskManager. * * @param taskManager The actor reference of the represented task manager. + * @param connectionInfo The remote connection where the task manager receives requests. * @param id The id under which the taskManager is registered. * @param resources The resources available on the machine. * @param numberOfSlots The number of task slots offered by this taskManager. @@ -123,15 +120,23 @@ public class Instance implements Serializable { } public void markDead() { + + // create a copy of the slots to avoid concurrent modification exceptions + List<Slot> slots; + synchronized (instanceLock) { if (isDead) { return; } - isDead = true; // no more notifications for the slot releasing this.slotAvailabilityListener = null; + + slots = new ArrayList<Slot>(allocatedSlots); + + allocatedSlots.clear(); + availableSlots.clear(); } /* @@ -139,14 +144,9 @@ public class Instance implements Serializable { * owning the assignment group lock wants to give itself back to the instance which requires * the instance lock */ - for (Slot slot : allocatedSlots) { + for (Slot slot : slots) { slot.releaseSlot(); } - - synchronized (instanceLock) { - allocatedSlots.clear(); - availableSlots.clear(); - } } @@ -185,9 +185,9 @@ public class Instance implements Serializable { // -------------------------------------------------------------------------------------------- // Resource allocation // -------------------------------------------------------------------------------------------- - + public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException { - return allocateSimpleSlot(jobID, jobID); + return allocateSimpleSlot(jobID, new AbstractID()); } public SimpleSlot allocateSimpleSlot(JobID jobID, AbstractID groupID) throws InstanceDiedException { @@ -211,8 +211,9 @@ public class Instance implements Serializable { } } - public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sharingGroupAssignment, AbstractID groupID) throws - InstanceDiedException { + public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sharingGroupAssignment, AbstractID groupID) + throws InstanceDiedException + { // the slot needs to be in the returned to taskManager state if (jobID == null) { throw new IllegalArgumentException(); @@ -227,8 +228,7 @@ public class Instance implements Serializable { if (nextSlot == null) { return null; } else { - SharedSlot slot = new SharedSlot(jobID, this, nextSlot, - sharingGroupAssignment, null, groupID); + SharedSlot slot = new SharedSlot(jobID, this, nextSlot, sharingGroupAssignment, null, groupID); allocatedSlots.add(slot); return slot; } @@ -267,10 +267,10 @@ public class Instance implements Serializable { } public void cancelAndReleaseAllSlots() { - List<Slot> copy = null; - + + // we need to do this copy because of concurrent modification exceptions + List<Slot> copy; synchronized (instanceLock) { - // we need to do this copy because of concurrent modification exceptions copy = new ArrayList<Slot>(this.allocatedSlots); } @@ -329,7 +329,7 @@ public class Instance implements Serializable { @Override public String toString() { - return instanceId + " @" + (taskManager != null ? taskManager.path() : "ActorRef.noSender") + " " + + return instanceId + " @ " + (taskManager != null ? taskManager.path() : "ActorRef.noSender") + " - " + numberOfSlots + " slots" + " - " + hashCode(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java index 2efcf6c..576db22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java @@ -117,7 +117,7 @@ public class SharedSlot extends Slot { if(isDead()){ return null; } else { - SimpleSlot slot = new SimpleSlot(jobID, instance, subSlots.size(), this, jID); + SimpleSlot slot = new SimpleSlot(getJobID(), getInstance(), subSlots.size(), this, jID); subSlots.add(slot); return slot; @@ -128,7 +128,7 @@ public class SharedSlot extends Slot { if(isDead()){ return null; } else { - SharedSlot slot = new SharedSlot(jobID, instance, subSlots.size(), assignmentGroup, this, jID); + SharedSlot slot = new SharedSlot(getJobID(), getInstance(), subSlots.size(), assignmentGroup, this, jID); subSlots.add(slot); return slot; http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java index 5b1af57..1d42f1d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java @@ -71,7 +71,7 @@ public class SimpleSlot extends Slot { } // check that we can actually run in this slot - if (status != ALLOCATED_AND_ALIVE) { + if (getStatus() != ALLOCATED_AND_ALIVE) { return false; } @@ -81,7 +81,7 @@ public class SimpleSlot extends Slot { } // we need to do a double check that we were not cancelled in the meantime - if (status != ALLOCATED_AND_ALIVE) { + if (getStatus() != ALLOCATED_AND_ALIVE) { this.executedTask = null; return false; } @@ -112,7 +112,7 @@ public class SimpleSlot extends Slot { getParent().disposeChild(this); } else { // we have to give back the slot to the owning instance - instance.returnAllocatedSlot(this); + getInstance().returnAllocatedSlot(this); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java index fb62c4c..2cf727c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java @@ -24,10 +24,12 @@ import org.apache.flink.runtime.jobgraph.JobID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /** - * Base class for slots. + * Base class for task slots. TaskManagers offer one or more task slots, they define how many + * parallel tasks or task groups a TaskManager executes. */ public abstract class Slot { - protected static final AtomicIntegerFieldUpdater<Slot> STATUS_UPDATER = + + private static final AtomicIntegerFieldUpdater<Slot> STATUS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Slot.class, "status"); protected static final int ALLOCATED_AND_ALIVE = 0; // tasks may be added and might be running @@ -35,25 +37,27 @@ public abstract class Slot { protected static final int RELEASED = 2; // has been given back to the instance /** The ID of the job this slice belongs to. */ - protected final JobID jobID; + private final JobID jobID; + /** The id of the group that this slot is allocated to */ + private final AbstractID groupID; + /** The instance on which the slot is allocated */ - protected final Instance instance; + private final Instance instance; + + /** The parent of this slot in the hierarchy, or null, if this is the parent */ + private final SharedSlot parent; /** The number of the slot on which the task is deployed */ - protected final int slotNumber; + private final int slotNumber; /** The state of the vertex, only atomically updated */ - protected volatile int status = ALLOCATED_AND_ALIVE; + private volatile int status = ALLOCATED_AND_ALIVE; /** Indicates whether this slot was marked dead by the system */ - private boolean dead = false; + private volatile boolean dead = false; - private final AbstractID groupID; - - private final SharedSlot parent; - - private boolean disposed = false; + private volatile boolean disposed = false; public Slot(JobID jobID, Instance instance, int slotNumber, SharedSlot parent, AbstractID groupID) { @@ -102,6 +106,10 @@ public abstract class Slot { return parent.getRoot(); } } + + public int getStatus() { + return status; + } public abstract int getNumberLeaves(); http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java index 8ef61b9..902b5a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java @@ -24,11 +24,7 @@ import org.apache.flink.runtime.instance.Instance; import com.google.common.base.Preconditions; import org.apache.flink.runtime.instance.SharedSlot; -import java.io.Serializable; - -public class CoLocationConstraint implements Serializable { - - static final long serialVersionUID = 42L; +public class CoLocationConstraint { private final CoLocationGroup group; http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java index 93b4541..2b86c43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java @@ -54,7 +54,9 @@ public class NoResourceAvailableException extends JobException { public NoResourceAvailableException(String message) { super(message); } - + + // -------------------------------------------------------------------------------------------- + @Override public boolean equals(Object obj){ if(obj == null){ @@ -67,4 +69,9 @@ public class NoResourceAvailableException extends JobException { return getMessage().equals(((NoResourceAvailableException)obj).getMessage()); } } + + @Override + public int hashCode() { + return getMessage().hashCode(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index c237aa5..1ad7a50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -408,7 +408,6 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { * local instance. If no such instance exists (all slots occupied), then return null. * * @param requestedLocations - * @return */ private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations){ if (this.instancesWithAvailableResources.isEmpty()) { http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java index f75f294..0f02748 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAvailabilityListener.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.jobmanager.scheduler; import org.apache.flink.runtime.instance.Instance; /** - * A SlotAvailabilityListener can be notified when new {@link org.apache.flink.runtime.instance.AllocatedSlot2}s become available - * on an {@link org.apache.flink.runtime.instance.Instance}. + * A SlotAvailabilityListener can be notified when new + * {@link org.apache.flink.runtime.instance.Slot}s become available on an {@link Instance}. */ public interface SlotAvailabilityListener { http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java index 1492ae1..6de3b39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java @@ -34,16 +34,15 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import akka.actor.ActorRef; + import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs; -import org.apache.flink.runtime.messages.ArchiveMessages.RequestArchivedJobs$; +import org.apache.flink.runtime.messages.ArchiveMessages; +import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsResponse; import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsFound; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestTotalNumberOfSlots$; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager$; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; import org.apache.flink.runtime.messages.JobManagerMessages.RequestAccumulatorResults; @@ -64,6 +63,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.StringUtils; import org.eclipse.jetty.io.EofException; + import scala.concurrent.duration.FiniteDuration; public class JobManagerInfoServlet extends HttpServlet { @@ -94,7 +94,7 @@ public class JobManagerInfoServlet extends HttpServlet { try { if("archive".equals(req.getParameter("get"))) { List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(AkkaUtils - .<ArchivedJobs>ask(archive,RequestArchivedJobs$.MODULE$, timeout) + .<ArchivedJobs>ask(archive, ArchiveMessages.getRequestArchivedJobs(), timeout) .asJavaCollection()); writeJsonForArchive(resp.getWriter(), archivedJobs); @@ -129,9 +129,9 @@ public class JobManagerInfoServlet extends HttpServlet { } else if("taskmanagers".equals(req.getParameter("get"))) { int numberOfTaskManagers = AkkaUtils.<Integer>ask(jobmanager, - RequestNumberRegisteredTaskManager$.MODULE$, timeout); + JobManagerMessages.getRequestNumberRegisteredTaskManager(), timeout); int numberOfRegisteredSlots = AkkaUtils.<Integer>ask(jobmanager, - RequestTotalNumberOfSlots$.MODULE$, timeout); + JobManagerMessages.getRequestTotalNumberOfSlots(), timeout); resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagers +", " + "\"slots\": "+numberOfRegisteredSlots+"}"); @@ -149,7 +149,7 @@ public class JobManagerInfoServlet extends HttpServlet { } else{ Iterable<ExecutionGraph> runningJobs = AkkaUtils.<RunningJobs>ask - (jobmanager, RequestRunningJobs$.MODULE$, timeout).asJavaIterable(); + (jobmanager, JobManagerMessages.getRequestRunningJobs(), timeout).asJavaIterable(); writeJsonForJobs(resp.getWriter(), runningJobs); } @@ -292,17 +292,16 @@ public class JobManagerInfoServlet extends HttpServlet { boolean first = true; for (ExecutionVertex vertex : graph.getAllExecutionVertices()) { if (vertex.getExecutionState() == ExecutionState.FAILED) { - SimpleSlot slot = vertex.getCurrentAssignedResource(); + InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation(); Throwable failureCause = vertex.getFailureCause(); - if (slot != null || failureCause != null) { + if (location != null || failureCause != null) { if (first) { first = false; } else { wrt.write(","); } wrt.write("{"); - wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot - .getInstance().getInstanceConnectionInfo().getFQDNHostname()) + "\","); + wrt.write("\"node\": \"" + (location == null ? "(none)" : location.getFQDNHostname()) + "\","); wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))) + "\""); wrt.write("}"); } @@ -421,7 +420,7 @@ public class JobManagerInfoServlet extends HttpServlet { try { Iterable<ExecutionGraph> graphs = AkkaUtils.<RunningJobs>ask(jobmanager, - RequestRunningJobs$.MODULE$, timeout).asJavaIterable(); + ArchiveMessages.getRequestArchivedJobs(), timeout).asJavaIterable(); //Serialize job to json wrt.write("{"); http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java index 8e46692..89e55d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.IntermediateResult; -import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.util.StringUtils; import java.util.HashMap; @@ -38,9 +38,8 @@ public class JsonFactory { json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\","); json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\","); - SimpleSlot slot = vertex.getCurrentAssignedResource(); - String instanceName = slot == null ? "(null)" : slot.getInstance() - .getInstanceConnectionInfo().getFQDNHostname(); + InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation(); + String instanceName = location == null ? "(null)" : location.getFQDNHostname(); json.append("\"vertexinstancename\": \"" + instanceName + "\""); json.append("}"); http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java index 71347ee..6f3720e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java @@ -64,11 +64,6 @@ public class WebInfoServer { private final Server server; /** - * Timeout for akka requests - */ - private final FiniteDuration timeout; - - /** * Port for info server */ private int port; @@ -92,7 +87,6 @@ public class WebInfoServer { this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); - this.timeout = timeout; // get base path of Flink installation final String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, ""); http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 532f7f8..79b9a74 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -460,7 +460,10 @@ class JobManager(val configuration: Configuration) */ private def removeJob(jobID: JobID): Unit = { currentJobs.remove(jobID) match { - case Some((eg, _)) => archive ! ArchiveExecutionGraph(jobID, eg) + case Some((eg, _)) => { + eg.prepareForArchiving() + archive ! ArchiveExecutionGraph(jobID, eg) + } case None => } http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 5ca8fb3..28e960d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -42,13 +42,7 @@ ActorLogging { // wrap graph inside a soft reference graphs.update(jobID, new SoftReference(graph)) - // clear all execution edges of the graph - val iter = graph.getAllExecutionVertices().iterator() - while (iter.hasNext) { - iter.next().clearExecutionEdges() - } - - cleanup(jobID) + trimHistory() } case RequestArchivedJobs => { @@ -89,17 +83,17 @@ ActorLogging { case Some(graph) => graph case None => null } + case None => null } /** * Remove old ExecutionGraphs belonging to a jobID * * if more than max_entries are in the queue. - * @param jobID */ - private def cleanup(jobID: JobID): Unit = { + private def trimHistory(): Unit = { while (graphs.size > max_entries) { // get first graph inserted - val (jobID, value) = graphs.iterator.next() + val (jobID, value) = graphs.head graphs.remove(jobID) } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala index 03a5351..884bc2a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala @@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobID * This object contains the archive specific messages. */ object ArchiveMessages { + case class ArchiveExecutionGraph(jobID: JobID, graph: ExecutionGraph) /** @@ -47,4 +48,12 @@ object ArchiveMessages { jobs.asJavaCollection } } + + // -------------------------------------------------------------------------- + // Utility methods to allow simpler case object access from Java + // -------------------------------------------------------------------------- + + def getRequestArchivedJobs() : AnyRef = { + RequestArchivedJobs + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala index fbacbd2..7ce013b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala @@ -326,4 +326,35 @@ object JobManagerMessages { case object JobManagerStatusAlive extends JobManagerStatus + // -------------------------------------------------------------------------- + // Utility methods to allow simpler case object access from Java + // -------------------------------------------------------------------------- + + def getRequestNumberRegisteredTaskManager() : AnyRef = { + RequestNumberRegisteredTaskManager + } + + def getRequestTotalNumberOfSlots() : AnyRef = { + RequestTotalNumberOfSlots + } + + def getRequestBlobManagerPort() : AnyRef = { + RequestBlobManagerPort + } + + def getRequestRunningJobs() : AnyRef = { + RequestRunningJobs + } + + def getRequestRegisteredTaskManagers() : AnyRef = { + RequestRegisteredTaskManagers + } + + def getRequestJobManagerStatus() : AnyRef = { + RequestJobManagerStatus + } + + def getJobManagerStatusAlive() : AnyRef = { + JobManagerStatusAlive + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ae0dc2d/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java index ca9dc16..d8663e8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java @@ -41,7 +41,7 @@ import static org.junit.Assert.assertTrue; /** * Tests for data sinks */ - +@SuppressWarnings("serial") @RunWith(Parameterized.class) public class DataSinkITCase extends MultipleProgramsTestBase {
