Repository: flink
Updated Branches:
  refs/heads/master 5bc93e877 -> ad31f6111


[FLINK-1425] [streaming] Add scheduling of all tasks at once

This closes #330.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad31f611
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad31f611
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad31f611

Branch: refs/heads/master
Commit: ad31f611150b4b95147dca26932b7ad11bb4b920
Parents: 5bc93e8
Author: Ufuk Celebi <[email protected]>
Authored: Fri Jan 23 11:30:30 2015 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Jan 26 11:12:57 2015 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  | 43 +++++++++++++----
 .../apache/flink/runtime/jobgraph/JobGraph.java | 10 ++++
 .../flink/runtime/jobgraph/ScheduleMode.java    | 35 ++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  1 +
 .../runtime/jobmanager/JobManagerITCase.scala   | 49 +++++++++++++++++++-
 .../apache/flink/runtime/jobmanager/Tasks.scala | 27 +++++++++++
 .../testingUtils/TestingJobManager.scala        |  6 +--
 7 files changed, 157 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 578cdb2..54c5d3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -113,6 +114,8 @@ public class ExecutionGraph implements Serializable {
 
        private boolean allowQueuedScheduling = true;
 
+       private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
+
        public ExecutionGraph(JobID jobId, String jobName, Configuration 
jobConfig, FiniteDuration timeout) {
                this(jobId, jobName, jobConfig, timeout, new 
ArrayList<BlobKey>());
        }
@@ -311,6 +314,14 @@ public class ExecutionGraph implements Serializable {
                this.allowQueuedScheduling = allowed;
        }
 
+       public void setScheduleMode(ScheduleMode scheduleMode) {
+               this.scheduleMode = scheduleMode;
+       }
+
+       public ScheduleMode getScheduleMode() {
+               return scheduleMode;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Actions
        // 
--------------------------------------------------------------------------------------------
@@ -326,14 +337,30 @@ public class ExecutionGraph implements Serializable {
                
                if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
                        this.scheduler = scheduler;
-                       
-                       // initially, we simply take the ones without inputs.
-                       // next, we implement the logic to go back from 
vertices that need computation
-                       // to the ones we need to start running
-                       for (ExecutionJobVertex ejv : this.tasks.values()) {
-                               if (ejv.getJobVertex().isInputVertex()) {
-                                       ejv.scheduleAll(scheduler, 
allowQueuedScheduling);
-                               }
+
+                       switch (scheduleMode) {
+
+                               case FROM_SOURCES:
+                                       // initially, we simply take the ones 
without inputs.
+                                       // next, we implement the logic to go 
back from vertices that need computation
+                                       // to the ones we need to start running
+                                       for (ExecutionJobVertex ejv : 
this.tasks.values()) {
+                                               if 
(ejv.getJobVertex().isInputVertex()) {
+                                                       
ejv.scheduleAll(scheduler, allowQueuedScheduling);
+                                               }
+                                       }
+
+                                       break;
+
+                               case ALL:
+                                       for (ExecutionJobVertex ejv : 
getVerticesTopologically()) {
+                                               ejv.scheduleAll(scheduler, 
allowQueuedScheduling);
+                                       }
+
+                                       break;
+
+                               case BACKTRACKING:
+                                       throw new JobException("BACKTRACKING is 
currently not supported as schedule mode.");
                        }
                }
                else {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index a288357..3561420 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -72,6 +72,8 @@ public class JobGraph implements Serializable {
        
        /** flag to enable queued scheduling */
        private boolean allowQueuedScheduling;
+
+       private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
        
        // 
--------------------------------------------------------------------------------------------
        
@@ -198,6 +200,14 @@ public class JobGraph implements Serializable {
                return allowQueuedScheduling;
        }
 
+       public void setScheduleMode(ScheduleMode scheduleMode) {
+               this.scheduleMode = scheduleMode;
+       }
+
+       public ScheduleMode getScheduleMode() {
+               return scheduleMode;
+       }
+
        /**
         * Adds a new task vertex to the job graph if it is not already 
included.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
new file mode 100644
index 0000000..330519d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+public enum ScheduleMode {
+
+       /**
+        * Schedule tasks from sources to sinks with lazy deployment of 
receiving tasks.
+        */
+       FROM_SOURCES,
+
+       BACKTRACKING,
+
+       /**
+        * Schedule tasks all at once instead of lazy deployment of receiving 
tasks.
+        */
+       ALL
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 37a41a5..87c9745 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -201,6 +201,7 @@ class JobManager(val configuration: Configuration)
                 jobGraph.getJobID, jobGraph.getName)
             }
 
+            executionGraph.setScheduleMode(jobGraph.getScheduleMode)
             
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
 
             // get notified about job status changes

http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index c324dc5..4ca3d19 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -22,14 +22,14 @@ import Tasks._
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern, JobGraph}
-import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException
+import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern, JobGraph, ScheduleMode}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import scheduler.{NoResourceAvailableException, SlotSharingGroup}
 
 import scala.concurrent.duration._
 
@@ -277,6 +277,51 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       }
     }
 
+    "support scheduling all at once" in {
+      val num_tasks = 16
+      val sender = new AbstractJobVertex("Sender")
+      val forwarder = new AbstractJobVertex("Forwarder")
+      val receiver = new AbstractJobVertex("Receiver")
+
+      sender.setInvokableClass(classOf[Sender])
+      forwarder.setInvokableClass(classOf[Forwarder])
+      receiver.setInvokableClass(classOf[AgnosticReceiver])
+
+      sender.setParallelism(num_tasks)
+      forwarder.setParallelism(num_tasks)
+      receiver.setParallelism(num_tasks)
+
+      val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID)
+      sender.setSlotSharingGroup(sharingGroup)
+      forwarder.setSlotSharingGroup(sharingGroup)
+      receiver.setSlotSharingGroup(sharingGroup)
+
+      forwarder.connectNewDataSetAsInput(sender, 
DistributionPattern.ALL_TO_ALL)
+      receiver.connectNewDataSetAsInput(forwarder, 
DistributionPattern.ALL_TO_ALL)
+
+      val jobGraph = new JobGraph("Forwarding Job", sender, forwarder, 
receiver)
+
+      jobGraph.setScheduleMode(ScheduleMode.ALL);
+
+      val cluster = TestingUtils.startTestingCluster(num_tasks, 1)
+      val jm = cluster.getJobManager
+
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          jm ! SubmitJob(jobGraph)
+
+          expectMsg(SubmissionSuccess(jobGraph.getJobID))
+
+          expectMsgType[JobResultSuccess]
+
+          jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
+          expectMsg(true)
+        }
+      } finally {
+        cluster.stop()
+      }
+    }
+
     "handle job with a failing sender vertex" in {
       val num_tasks = 100
       val sender = new AbstractJobVertex("Sender")

http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 316f340..42832b4 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -68,6 +68,33 @@ object Tasks {
     }
   }
 
+  class Forwarder extends AbstractInvokable {
+    var reader: RecordReader[IntegerRecord] = _
+    var writer: RecordWriter[IntegerRecord] = _
+    override def registerInputOutput(): Unit = {
+      reader = new RecordReader[IntegerRecord](getEnvironment.getReader(0), 
classOf[IntegerRecord])
+      writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
+    }
+
+    override def invoke(): Unit = {
+      try {
+        while (true) {
+          val record = reader.next();
+
+          if (record == null) {
+            return;
+          }
+
+          writer.emit(record);
+        }
+
+        writer.flush()
+      } finally {
+        writer.clearBuffers()
+      }
+    }
+  }
+
   class Receiver extends AbstractInvokable {
     var reader: RecordReader[IntegerRecord] = _
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad31f611/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 3b86a35..bd5bc50 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -24,13 +24,11 @@ import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
-import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusChanged,
-ExecutionStateChanged}
+import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenTaskRemoved
 
 import scala.collection.convert.WrapAsScala
-import scala.concurrent.{Await, Future}
+import scala.concurrent.Future
 
 
 trait TestingJobManager extends ActorLogMessages with WrapAsScala {

Reply via email to