Repository: flink Updated Branches: refs/heads/master 5bc93e877 -> ad31f6111
[FLINK-1425] [streaming] Add scheduling of all tasks at once This closes #330. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad31f611 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad31f611 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad31f611 Branch: refs/heads/master Commit: ad31f611150b4b95147dca26932b7ad11bb4b920 Parents: 5bc93e8 Author: Ufuk Celebi <[email protected]> Authored: Fri Jan 23 11:30:30 2015 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Mon Jan 26 11:12:57 2015 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 43 +++++++++++++---- .../apache/flink/runtime/jobgraph/JobGraph.java | 10 ++++ .../flink/runtime/jobgraph/ScheduleMode.java | 35 ++++++++++++++ .../flink/runtime/jobmanager/JobManager.scala | 1 + .../runtime/jobmanager/JobManagerITCase.scala | 49 +++++++++++++++++++- .../apache/flink/runtime/jobmanager/Tasks.scala | 27 +++++++++++ .../testingUtils/TestingJobManager.scala | 6 +-- 7 files changed, 157 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 578cdb2..54c5d3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -113,6 +114,8 @@ public class ExecutionGraph implements Serializable { private boolean allowQueuedScheduling = true; + private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; + public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) { this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>()); } @@ -311,6 +314,14 @@ public class ExecutionGraph implements Serializable { this.allowQueuedScheduling = allowed; } + public void setScheduleMode(ScheduleMode scheduleMode) { + this.scheduleMode = scheduleMode; + } + + public ScheduleMode getScheduleMode() { + return scheduleMode; + } + // -------------------------------------------------------------------------------------------- // Actions // -------------------------------------------------------------------------------------------- @@ -326,14 +337,30 @@ public class ExecutionGraph implements Serializable { if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { this.scheduler = scheduler; - - // initially, we simply take the ones without inputs. - // next, we implement the logic to go back from vertices that need computation - // to the ones we need to start running - for (ExecutionJobVertex ejv : this.tasks.values()) { - if (ejv.getJobVertex().isInputVertex()) { - ejv.scheduleAll(scheduler, allowQueuedScheduling); - } + + switch (scheduleMode) { + + case FROM_SOURCES: + // initially, we simply take the ones without inputs. + // next, we implement the logic to go back from vertices that need computation + // to the ones we need to start running + for (ExecutionJobVertex ejv : this.tasks.values()) { + if (ejv.getJobVertex().isInputVertex()) { + ejv.scheduleAll(scheduler, allowQueuedScheduling); + } + } + + break; + + case ALL: + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + ejv.scheduleAll(scheduler, allowQueuedScheduling); + } + + break; + + case BACKTRACKING: + throw new JobException("BACKTRACKING is currently not supported as schedule mode."); } } else { http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index a288357..3561420 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -72,6 +72,8 @@ public class JobGraph implements Serializable { /** flag to enable queued scheduling */ private boolean allowQueuedScheduling; + + private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; // -------------------------------------------------------------------------------------------- @@ -198,6 +200,14 @@ public class JobGraph implements Serializable { return allowQueuedScheduling; } + public void setScheduleMode(ScheduleMode scheduleMode) { + this.scheduleMode = scheduleMode; + } + + public ScheduleMode getScheduleMode() { + return scheduleMode; + } + /** * Adds a new task vertex to the job graph if it is not already included. * http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java new file mode 100644 index 0000000..330519d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +public enum ScheduleMode { + + /** + * Schedule tasks from sources to sinks with lazy deployment of receiving tasks. + */ + FROM_SOURCES, + + BACKTRACKING, + + /** + * Schedule tasks all at once instead of lazy deployment of receiving tasks. + */ + ALL + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 37a41a5..87c9745 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -201,6 +201,7 @@ class JobManager(val configuration: Configuration) jobGraph.getJobID, jobGraph.getName) } + executionGraph.setScheduleMode(jobGraph.getScheduleMode) executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling) // get notified about job status changes http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index c324dc5..4ca3d19 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -22,14 +22,14 @@ import Tasks._ import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph} -import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException +import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph, ScheduleMode} import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved import org.apache.flink.runtime.testingUtils.TestingUtils import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import scheduler.{NoResourceAvailableException, SlotSharingGroup} import scala.concurrent.duration._ @@ -277,6 +277,51 @@ WordSpecLike with Matchers with BeforeAndAfterAll { } } + "support scheduling all at once" in { + val num_tasks = 16 + val sender = new AbstractJobVertex("Sender") + val forwarder = new AbstractJobVertex("Forwarder") + val receiver = new AbstractJobVertex("Receiver") + + sender.setInvokableClass(classOf[Sender]) + forwarder.setInvokableClass(classOf[Forwarder]) + receiver.setInvokableClass(classOf[AgnosticReceiver]) + + sender.setParallelism(num_tasks) + forwarder.setParallelism(num_tasks) + receiver.setParallelism(num_tasks) + + val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID) + sender.setSlotSharingGroup(sharingGroup) + forwarder.setSlotSharingGroup(sharingGroup) + receiver.setSlotSharingGroup(sharingGroup) + + forwarder.connectNewDataSetAsInput(sender, DistributionPattern.ALL_TO_ALL) + receiver.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL) + + val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, receiver) + + jobGraph.setScheduleMode(ScheduleMode.ALL); + + val cluster = TestingUtils.startTestingCluster(num_tasks, 1) + val jm = cluster.getJobManager + + try { + within(TestingUtils.TESTING_DURATION) { + jm ! SubmitJob(jobGraph) + + expectMsg(SubmissionSuccess(jobGraph.getJobID)) + + expectMsgType[JobResultSuccess] + + jm ! NotifyWhenJobRemoved(jobGraph.getJobID) + expectMsg(true) + } + } finally { + cluster.stop() + } + } + "handle job with a failing sender vertex" in { val num_tasks = 100 val sender = new AbstractJobVertex("Sender") http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index 316f340..42832b4 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -68,6 +68,33 @@ object Tasks { } } + class Forwarder extends AbstractInvokable { + var reader: RecordReader[IntegerRecord] = _ + var writer: RecordWriter[IntegerRecord] = _ + override def registerInputOutput(): Unit = { + reader = new RecordReader[IntegerRecord](getEnvironment.getReader(0), classOf[IntegerRecord]) + writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0)) + } + + override def invoke(): Unit = { + try { + while (true) { + val record = reader.next(); + + if (record == null) { + return; + } + + writer.emit(record); + } + + writer.flush() + } finally { + writer.clearBuffers() + } + } + } + class Receiver extends AbstractInvokable { var reader: RecordReader[IntegerRecord] = _ http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index 3b86a35..bd5bc50 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -24,13 +24,11 @@ import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist} -import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusChanged, -ExecutionStateChanged} +import org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenTaskRemoved import scala.collection.convert.WrapAsScala -import scala.concurrent.{Await, Future} +import scala.concurrent.Future trait TestingJobManager extends ActorLogMessages with WrapAsScala {
