[FLINK-1415] [runtime] Akka cleanups Replace akka.jobmanager.url by non exposed mechanism. Add heuristics to calculate different timeouts based on a single value.
Harmonize scala coding style: Remove redundant braces and parentheses, remove meaningless code statements, standardize access patterns, name boolean parameters, unnecessary semicolons, unnecessary braces in import section Adds death watch test cases: Test if JobManager detects failing TaskManager. Test if the TaskManager detects failing JobManager and tries to reconnect to the JobManager. Refactors notifyExecutionStateChange method to avoid access of the TaskManagers internal state from outside This closes #319. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4046819b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4046819b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4046819b Branch: refs/heads/master Commit: 4046819b380b8dfa57d52c6d314f389546a159a3 Parents: 6372063 Author: Till Rohrmann <[email protected]> Authored: Tue Jan 6 11:15:30 2015 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Feb 5 14:47:12 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 8 +- .../org/apache/flink/client/program/Client.java | 8 +- .../flink/client/web/JobsInfoServlet.java | 6 +- .../apache/flink/client/program/ClientTest.java | 3 +- .../flink/configuration/ConfigConstants.java | 15 +- .../examples/scala/clustering/KMeans.scala | 6 +- .../scala/graph/ConnectedComponents.scala | 6 +- .../scala/graph/EnumTrianglesBasic.scala | 6 +- .../examples/scala/graph/EnumTrianglesOpt.scala | 6 +- .../examples/scala/graph/PageRankBasic.scala | 6 +- .../examples/scala/ml/LinearRegression.scala | 6 +- .../examples/scala/wordcount/WordCount.scala | 3 +- .../flink/runtime/executiongraph/Execution.java | 1 - .../scheduler/NoResourceAvailableException.java | 2 +- .../jobmanager/web/LogfileInfoServlet.java | 6 +- .../apache/flink/runtime/taskmanager/Task.java | 28 +- .../apache/flink/runtime/ActorLogMessages.scala | 7 +- .../apache/flink/runtime/akka/AkkaUtils.scala | 325 +++++++++----- .../apache/flink/runtime/client/JobClient.scala | 153 +++++-- .../flink/runtime/jobmanager/JobInfo.scala | 13 +- .../flink/runtime/jobmanager/JobManager.scala | 445 +++++++++++-------- .../runtime/jobmanager/JobManagerProfiler.scala | 12 +- .../runtime/jobmanager/MemoryArchivist.scala | 47 +- .../runtime/jobmanager/WithWebServer.scala | 3 + .../runtime/messages/ArchiveMessages.scala | 4 +- .../messages/ExecutionGraphMessages.scala | 6 +- .../runtime/messages/JobmanagerMessages.scala | 20 +- .../runtime/messages/TaskManagerMessages.scala | 10 +- .../runtime/minicluster/FlinkMiniCluster.scala | 55 ++- .../minicluster/LocalFlinkMiniCluster.scala | 25 +- .../taskmanager/MemoryUsageLogging.scala | 21 - .../flink/runtime/taskmanager/TaskManager.scala | 324 ++++++++------ .../TaskManagerCLIConfiguration.scala | 6 + .../taskmanager/TaskManagerProfiler.scala | 37 +- .../executiongraph/AllVerticesIteratorTest.java | 8 +- .../ExecutionGraphConstructionTest.java | 17 +- .../ExecutionGraphDeploymentTest.java | 4 +- .../executiongraph/ExecutionGraphTestUtils.java | 4 +- .../ExecutionStateProgressTest.java | 2 +- .../ExecutionVertexCancelTest.java | 26 +- .../ExecutionVertexDeploymentTest.java | 14 +- .../ExecutionVertexSchedulingTest.java | 6 +- .../executiongraph/PointwisePatternTest.java | 14 +- .../executiongraph/VertexSlotSharingTest.java | 2 +- .../runtime/instance/AllocatedSlotTest.java | 141 ------ .../flink/runtime/instance/SimpleSlotTest.java | 141 ++++++ .../runtime/taskmanager/TaskManagerTest.java | 6 +- .../flink/runtime/taskmanager/TaskTest.java | 53 ++- .../ExecutionGraphRestartTest.scala | 6 +- .../TaskManagerLossFailsTasksTest.scala | 2 +- .../jobmanager/CoLocationConstraintITCase.scala | 2 +- .../jobmanager/JobManagerFailsITCase.scala | 72 +++ .../runtime/jobmanager/JobManagerITCase.scala | 4 +- .../runtime/jobmanager/RecoveryITCase.scala | 2 +- .../runtime/jobmanager/SlotSharingITCase.scala | 2 +- .../jobmanager/TaskManagerFailsITCase.scala | 37 +- .../TaskManagerRegistrationITCase.scala | 6 +- .../apache/flink/runtime/jobmanager/Tasks.scala | 4 +- .../runtime/testingUtils/TestingCluster.scala | 20 +- .../testingUtils/TestingJobManager.scala | 22 +- .../TestingJobManagerMessages.scala | 3 + .../testingUtils/TestingMemoryArchivist.scala | 3 + .../testingUtils/TestingTaskManager.scala | 35 +- .../TestingTaskManagerMessages.scala | 9 +- .../runtime/testingUtils/TestingUtils.scala | 88 ++-- .../apache/flink/api/scala/ClosureCleaner.scala | 2 +- .../apache/flink/api/scala/CoGroupDataSet.scala | 3 +- .../flink/api/scala/codegen/TreeGen.scala | 2 +- .../api/scala/codegen/TypeInformationGen.scala | 3 +- .../scala/typeutils/CaseClassSerializer.scala | 3 +- .../socket/SocketTextStreamWordCount.scala | 2 + .../examples/windowing/TopSpeedWindowing.scala | 5 +- .../scala/examples/windowing/WindowJoin.scala | 1 + .../api/scala/StreamJoinOperator.scala | 2 +- .../flink/streaming/api/scala/package.scala | 2 + .../java/org/apache/flink/tachyon/HDFSTest.java | 3 - .../flink/test/util/AbstractTestBase.java | 8 +- .../apache/flink/test/util/TestBaseUtils.java | 4 +- .../test/util/ForkableFlinkMiniCluster.scala | 2 +- .../test/cancelling/CancellingTestBase.java | 4 +- .../scala/functions/ClosureCleanerITCase.scala | 18 +- .../api/scala/operators/AggregateITCase.scala | 6 +- .../api/scala/operators/CoGroupITCase.scala | 28 +- .../flink/api/scala/operators/CrossITCase.scala | 20 +- .../api/scala/operators/DistinctITCase.scala | 18 +- .../api/scala/operators/ExamplesITCase.scala | 10 +- .../api/scala/operators/FilterITCase.scala | 16 +- .../api/scala/operators/FirstNITCase.scala | 8 +- .../api/scala/operators/FlatMapITCase.scala | 16 +- .../api/scala/operators/GroupReduceITCase.scala | 73 +-- .../flink/api/scala/operators/JoinITCase.scala | 42 +- .../flink/api/scala/operators/MapITCase.scala | 20 +- .../api/scala/operators/PartitionITCase.scala | 16 +- .../api/scala/operators/ReduceITCase.scala | 22 +- .../api/scala/operators/SumMinMaxITCase.scala | 8 +- .../flink/api/scala/operators/UnionITCase.scala | 8 +- .../tuple/base/PairComparatorTestBase.scala | 6 +- .../org/apache/flink/yarn/YarnTestBase.java | 1 - .../org/apache/flink/yarn/FlinkYarnCluster.java | 5 +- .../yarn/appMaster/YarnTaskManagerRunner.java | 5 +- .../apache/flink/yarn/ApplicationClient.scala | 38 +- .../apache/flink/yarn/ApplicationMaster.scala | 9 +- .../org/apache/flink/yarn/YarnJobManager.scala | 29 +- .../org/apache/flink/yarn/YarnTaskManager.scala | 3 +- .../scala/org/apache/flink/yarn/YarnUtils.scala | 48 +- .../java/org/apache/flink/yarn/UtilsTests.java | 1 - 106 files changed, 1666 insertions(+), 1238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index dfd2f70..3455fcd 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -33,7 +33,6 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -796,8 +795,8 @@ public class CliFrontend { } return JobManager.getJobManager(RemoteExecutor.getInetFromHostport(jobManagerAddressStr), - ActorSystem.create("CliFrontendActorSystem", AkkaUtils - .getDefaultActorSystemConfig()),getAkkaTimeout()); + ActorSystem.create("CliFrontendActorSystem", + AkkaUtils.getDefaultAkkaConfig()),getAkkaTimeout()); } @@ -867,8 +866,7 @@ public class CliFrontend { protected FiniteDuration getAkkaTimeout(){ Configuration config = getGlobalConfiguration(); - return new FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS); + return AkkaUtils.getTimeout(config); } public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) { http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index f1444ff..9b95d41 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -24,8 +24,8 @@ import java.io.IOException; import java.io.PrintStream; import java.net.InetSocketAddress; import java.util.List; -import java.util.concurrent.TimeUnit; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; @@ -305,14 +305,14 @@ public class Client { } public JobExecutionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException { - Tuple2<ActorSystem, ActorRef> pair = JobClient.startActorSystemAndActor(configuration); + Tuple2<ActorSystem, ActorRef> pair = JobClient.startActorSystemAndActor(configuration, + false); ActorRef client = pair._2(); String hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - FiniteDuration timeout = new FiniteDuration(configuration.getInteger(ConfigConstants - .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS); + FiniteDuration timeout = AkkaUtils.getTimeout(configuration); if(hostname == null){ throw new ProgramInvocationException("Could not find hostname of job manager."); http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java index e339ec7..f83e9b4 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java +++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsInfoServlet.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.util.Iterator; -import java.util.concurrent.TimeUnit; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; @@ -59,9 +58,8 @@ public class JobsInfoServlet extends HttpServlet { public JobsInfoServlet(Configuration flinkConfig) { this.config = flinkConfig; system = ActorSystem.create("JobsInfoServletActorSystem", - AkkaUtils.getDefaultActorSystemConfig()); - this.timeout = new FiniteDuration(flinkConfig.getInteger(ConfigConstants - .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS); + AkkaUtils.getDefaultAkkaConfig()); + this.timeout = AkkaUtils.getTimeout(flinkConfig); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index ea7780c..ba520c3 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -84,6 +84,7 @@ public class ClientTest { when(configMock.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)).thenReturn("localhost"); when(configMock.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)).thenReturn(6123); + when(configMock.getString(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).thenReturn(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); when(planMock.getJobName()).thenReturn("MockPlan"); // when(mockJarFile.getAbsolutePath()).thenReturn("mockFilePath"); @@ -99,7 +100,7 @@ public class ClientTest { Whitebox.setInternalState(JobClient$.class, mockJobClient); - when(mockJobClient.startActorSystemAndActor(configMock)).thenReturn(new Tuple2<ActorSystem, + when(mockJobClient.startActorSystemAndActor(configMock, false)).thenReturn(new Tuple2<ActorSystem, ActorRef>(mockSystem, mockJobClientActor)); } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index a2f2c83..87a3d5f 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -65,11 +65,6 @@ public final class ConfigConstants { * for communication with the job manager. */ public static final String JOB_MANAGER_IPC_PORT_KEY = "jobmanager.rpc.port"; - - /** - * The config parameter defining the akka url of the job manager - */ - public static final String JOB_MANAGER_AKKA_URL = "jobmanager.akka.url"; /** * The config parameter defining the number of handler threads for the jobmanager RPC service. @@ -594,29 +589,21 @@ public final class ConfigConstants { // ------------------------------ Akka Values ------------------------------ - public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s"; - public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s"; public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s"; public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0; - public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms"; - - public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "1 m"; - public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12; - public static String DEFAULT_AKKA_TCP_TIMEOUT = "100 s"; - public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15; public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false; public static String DEFAULT_AKKA_FRAMESIZE = "10485760b"; - public static int DEFAULT_AKKA_ASK_TIMEOUT = 100; + public static String DEFAULT_AKKA_ASK_TIMEOUT = "100 s"; // ----------------------------- LocalExecution ---------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index 11430e9..26d01c3 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -112,10 +112,13 @@ object KMeans { centersPath = programArguments(1) outputPath = programArguments(2) numIterations = Integer.parseInt(programArguments(3)) + + true } else { System.err.println("Usage: KMeans <points path> <centers path> <result path> <num " + "iterations>") + false } } @@ -128,8 +131,9 @@ object KMeans { "program.") System.out.println(" Usage: KMeans <points path> <centers path> <result path> <num " + "iterations>") + + true } - true } private def getPointDataSet(env: ExecutionEnvironment): DataSet[Point] = { http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala index 2bb6916..e75c862 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala @@ -113,9 +113,12 @@ object ConnectedComponents { edgesPath = args(1) outputPath = args(2) maxIterations = args(3).toInt + + true } else { System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path>" + " <max number of iterations>") + false } } else { @@ -123,8 +126,9 @@ object ConnectedComponents { System.out.println(" Provide parameters to read input data from a file.") System.out.println(" Usage: ConnectedComponents <vertices path> <edges path> <result path>" + " <max number of iterations>") + + true } - true } private def getVerticesDataSet(env: ExecutionEnvironment): DataSet[Long] = { http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala index 2623c0c..a62786c 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala @@ -148,8 +148,11 @@ object EnumTrianglesBasic { if (args.length == 2) { edgePath = args(0) outputPath = args(1) + + true } else { System.err.println("Usage: EnumTriangleBasic <edge path> <result path>") + false } } else { @@ -157,8 +160,9 @@ object EnumTrianglesBasic { System.out.println(" Provide parameters to read input data from files.") System.out.println(" See the documentation for the correct format of input files.") System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>") + + true } - true } private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = { http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala index 64aeb77..244e968 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala @@ -215,8 +215,11 @@ object EnumTrianglesOpt { if (args.length == 2) { edgePath = args(0) outputPath = args(1) + + true } else { System.err.println("Usage: EnumTriangleOpt <edge path> <result path>") + false } } else { @@ -224,8 +227,9 @@ object EnumTrianglesOpt { System.out.println(" Provide parameters to read input data from files.") System.out.println(" See the documentation for the correct format of input files.") System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>") + + true } - true } private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = { http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala index dc42ed1..a3ea4b3 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala @@ -159,9 +159,12 @@ object PageRankBasic { outputPath = args(2) numPages = args(3).toLong maxIterations = args(4).toInt + + true } else { System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " + "pages> <num iterations>") + false } } else { @@ -173,8 +176,9 @@ object PageRankBasic { "pages> <num iterations>") numPages = PageRankData.getNumberOfPages + + true } - true } private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = { http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala index 508d677..8efb63b 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala @@ -150,9 +150,12 @@ object LinearRegression { dataPath = programArguments(0) outputPath = programArguments(1) numIterations = programArguments(2).toInt + + true } else { System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>") + false } } @@ -164,8 +167,9 @@ object LinearRegression { System.out.println(" We provide a data generator to create synthetic input files for this " + "program.") System.out.println(" Usage: LinearRegression <data path> <result path> <num iterations>") + + true } - true } private def getDataSet(env: ExecutionEnvironment): DataSet[Data] = { http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala index 49e1a48..b5c2ee2 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala @@ -70,6 +70,7 @@ object WordCount { if (args.length == 2) { textPath = args(0) outputPath = args(1) + true } else { System.err.println("Usage: WordCount <text path> <result path>") false @@ -78,8 +79,8 @@ object WordCount { System.out.println("Executing WordCount example with built-in default data.") System.out.println(" Provide parameters to read input data from a file.") System.out.println(" Usage: WordCount <text path> <result path>") + true } - true } private def getTextDataSet(env: ExecutionEnvironment): DataSet[String] = { http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 27977c3..8c5c673 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -22,7 +22,6 @@ import akka.actor.ActorRef; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; - import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.PartitionInfo; http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java index 2b86c43..ebf37d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java @@ -35,7 +35,7 @@ public class NoResourceAvailableException extends JobException { super("No resource available to schedule unit " + unit + ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration."); } - + public NoResourceAvailableException(int numInstances, int numSlotsTotal, int availableSlots) { super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d, available slots=%d", BASE_MESSAGE, numInstances, numSlotsTotal, availableSlots)); http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java index 05f3ed9..23f1604 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java @@ -33,8 +33,6 @@ import org.slf4j.LoggerFactory; import org.apache.flink.util.StringUtils; -import com.google.common.base.Preconditions; - public class LogfileInfoServlet extends HttpServlet { private static final long serialVersionUID = 1L; @@ -48,7 +46,9 @@ public class LogfileInfoServlet extends HttpServlet { public LogfileInfoServlet(File[] logDirs) { - Preconditions.checkNotNull(logDirs, "The given log files are null."); + if(logDirs == null){ + throw new NullPointerException("The given log files are null."); + } this.logDirs = logDirs; } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 5b6259a..715515e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.taskmanager; import akka.actor.ActorRef; +import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.RuntimeEnvironment; @@ -39,7 +40,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -public final class Task { +public class Task { /** For atomic state updates */ private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER = @@ -62,7 +63,7 @@ public final class Task { private final String taskName; - private final TaskManager taskManager; + private final ActorRef taskManager; private final List<ActorRef> executionListenerActors = new CopyOnWriteArrayList<ActorRef>(); @@ -75,7 +76,7 @@ public final class Task { // -------------------------------------------------------------------------------------------- public Task(JobID jobId, JobVertexID vertexId, int taskIndex, int parallelism, - ExecutionAttemptID executionId, String taskName, TaskManager taskManager) { + ExecutionAttemptID executionId, String taskName, ActorRef taskManager) { this.jobId = jobId; this.vertexId = vertexId; @@ -164,7 +165,7 @@ public final class Task { public boolean markAsFinished() { if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) { notifyObservers(ExecutionState.FINISHED, null); - taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FINISHED, null); + notifyExecutionStateChange(ExecutionState.FINISHED, null); return true; } else { @@ -186,7 +187,7 @@ public final class Task { // message back else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { notifyObservers(ExecutionState.FAILED, ExceptionUtils.stringifyException(error)); - taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, error); + notifyExecutionStateChange(ExecutionState.FAILED, error); return; } } @@ -208,7 +209,7 @@ public final class Task { if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { notifyObservers(ExecutionState.CANCELED, null); - taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.CANCELED, null); + notifyExecutionStateChange(ExecutionState.CANCELED, null); return; } } @@ -260,7 +261,7 @@ public final class Task { if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { notifyObservers(ExecutionState.FAILED, null); - taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, cause); + notifyExecutionStateChange(ExecutionState.FAILED, cause); return; } } @@ -275,7 +276,7 @@ public final class Task { } notifyObservers(ExecutionState.FAILED, null); - taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, cause); + notifyExecutionStateChange(ExecutionState.FAILED, cause); return; } @@ -299,7 +300,7 @@ public final class Task { if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { notifyObservers(ExecutionState.CANCELED, null); - taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.CANCELED, null); + notifyExecutionStateChange(ExecutionState.CANCELED, null); return; } } @@ -329,6 +330,15 @@ public final class Task { } } + protected void notifyExecutionStateChange(ExecutionState executionState, + Throwable optionalError) { + LOG.info("Update execution state to " + executionState); + taskManager.tell(new JobManagerMessages.UpdateTaskExecutionState( + new TaskExecutionState(jobId, executionId, executionState, optionalError)), + ActorRef.noSender()); + + } + // ----------------------------------------------------------------------------------------------------------------- // Task Profiling // ----------------------------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala index c8ec180..14f0ab0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala @@ -21,6 +21,9 @@ package org.apache.flink.runtime import _root_.akka.actor.Actor import _root_.akka.event.LoggingAdapter +/** + * Mixin to add debug message logging + */ trait ActorLogMessages { self: Actor => @@ -34,14 +37,14 @@ trait ActorLogMessages { _receiveWithLogMessages(x) } else { - log.debug(s"Received message ${x} from ${self.sender}.") + log.debug(s"Received message $x from ${self.sender}.") val start = System.nanoTime() _receiveWithLogMessages(x) val duration = (System.nanoTime() - start) / 1000000 - log.debug(s"Handled message ${x} in ${duration} ms from ${self.sender}.") + log.debug(s"Handled message $x in $duration ms from ${self.sender}.") } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index e6e865e..afbb6dd 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -28,40 +28,154 @@ import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, Future, Await} import scala.concurrent.duration._ +import scala.language.postfixOps +/** + * This class contains utility functions for akka. It contains methods to start an actor system with + * a given akka configuration. Furthermore, the akka configuration used for starting the different + * actor systems resides in this class. + */ object AkkaUtils { val LOG = LoggerFactory.getLogger(AkkaUtils.getClass) - val DEFAULT_TIMEOUT: FiniteDuration = - FiniteDuration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS) - val INF_TIMEOUT = 21474835 seconds var globalExecutionContext: ExecutionContext = ExecutionContext.global - def createActorSystem(host: String, port: Int, configuration: Configuration): ActorSystem = { - val akkaConfig = ConfigFactory.parseString(AkkaUtils.getConfigString(host, port, configuration)) + /** + * Creates an actor system. If a listening address is specified, then the actor system will listen + * on that address for messages from a remote actor system. If not, then a local actor system + * will be instantiated. + * + * @param configuration instance containing the user provided configuration values + * @param listeningAddress an optional tuple containing a hostname and a port to bind to. If the + * parameter is None, then a local actor system will be created. + * @return created actor system + */ + def createActorSystem(configuration: Configuration, + listeningAddress: Option[(String, Int)]): ActorSystem = { + val akkaConfig = getAkkaConfig(configuration, listeningAddress) createActorSystem(akkaConfig) } - def createActorSystem(): ActorSystem = { - createActorSystem(getDefaultActorSystemConfig) + /** + * Creates an actor system with the given akka config. + * + * @param akkaConfig configuration for the actor system + * @return created actor system + */ + def createActorSystem(akkaConfig: Config): ActorSystem = { + ActorSystem.create("flink", akkaConfig) } - def createLocalActorSystem(): ActorSystem = { - createActorSystem(getDefaultLocalActorSystemConfig) + /** + * Creates an actor system with the default config and listening on a random port of the + * localhost. + * + * @return default actor system listening on a random port of the localhost + */ + def createDefaultActorSystem(): ActorSystem = { + createActorSystem(getDefaultAkkaConfig) } - def createActorSystem(akkaConfig: Config): ActorSystem = { - if(LOG.isDebugEnabled) { - LOG.debug(s"Using akka config to create actor system: $akkaConfig") + /** + * Creates an akka config with the provided configuration values. If the listening address is + * specified, then the actor system will listen on the respective address. + * + * @param configuration instance containing the user provided configuration values + * @param listeningAddress optional tuple of hostname and port to listen on. If None is given, + * then an Akka config for local actor system will be returned + * @return Akka config + */ + def getAkkaConfig(configuration: Configuration, + listeningAddress: Option[(String, Int)]): Config = { + val defaultConfig = getBasicAkkaConfig(configuration) + + listeningAddress match { + case Some((hostname, port)) => + val remoteConfig = getRemoteAkkaConfig(configuration, hostname, port) + remoteConfig.withFallback(defaultConfig) + case None => + defaultConfig } - ActorSystem.create("flink", akkaConfig) } - def getConfigString(host: String, port: Int, configuration: Configuration): String = { + /** + * Creates the default akka configuration which listens on a random port on the local machine. + * All configuration values are set to default values. + * + * @return Flink's Akka default config + */ + def getDefaultAkkaConfig: Config = { + getAkkaConfig(new Configuration(), Some(("", 0))) + } + + /** + * Gets the basic Akka config which is shared by remote and local actor systems. + * + * @param configuration instance which contains the user specified values for the configuration + * @return Flink's basic Akka config + */ + private def getBasicAkkaConfig(configuration: Configuration): Config = { + val akkaThroughput = configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT, + ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT) + val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS, + ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS) + + val logLifecycleEvents = if (lifecycleEvents) "on" else "off" + + val logLevel = getLogLevel + + val config = + s""" + |akka { + | daemonic = on + | + | loggers = ["akka.event.slf4j.Slf4jLogger"] + | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + | log-config-on-start = off + | + | jvm-exit-on-fatal-error = off + | + | serialize-messages = off + | + | loglevel = $logLevel + | stdout-loglevel = $logLevel + | + | log-dead-letters = $logLifecycleEvents + | log-dead-letters-during-shutdown = $logLifecycleEvents + | + | actor { + | default-dispatcher { + | throughput = $akkaThroughput + | + | fork-join-executor { + | parallelism-factor = 2.0 + | } + | } + | } + |} + """.stripMargin + + ConfigFactory.parseString(config) + } + + /** + * Creates a Akka config for a remote actor system listening on port on the network interface + * identified by hostname. + * + * @param configuration instance containing the user provided configuration values + * @param hostname of the network interface to listen on + * @param port to bind to or if 0 then Akka picks a free port automatically + * @return Flink's Akka configuration for remote actor systems + */ + private def getRemoteAkkaConfig(configuration: Configuration, + hostname: String, port: Int): Config = { + val akkaAskTimeout = Duration(configuration.getString(ConfigConstants.AKKA_ASK_TIMEOUT, + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)) + val startupTimeout = configuration.getString(ConfigConstants.AKKA_STARTUP_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_STARTUP_TIMEOUT) + akkaAskTimeout.toString) val transportHeartbeatInterval = configuration.getString(ConfigConstants. AKKA_TRANSPORT_HEARTBEAT_INTERVAL, ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL) @@ -71,13 +185,13 @@ object AkkaUtils { val transportThreshold = configuration.getDouble(ConfigConstants.AKKA_TRANSPORT_THRESHOLD, ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD) val watchHeartbeatInterval = configuration.getString(ConfigConstants - .AKKA_WATCH_HEARTBEAT_INTERVAL, ConfigConstants.DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL) + .AKKA_WATCH_HEARTBEAT_INTERVAL, (akkaAskTimeout/10).toString) val watchHeartbeatPause = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, - ConfigConstants.DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE) + akkaAskTimeout.toString) val watchThreshold = configuration.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD) val akkaTCPTimeout = configuration.getString(ConfigConstants.AKKA_TCP_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_TCP_TIMEOUT) + akkaAskTimeout.toString) val akkaFramesize = configuration.getString(ConfigConstants.AKKA_FRAMESIZE, ConfigConstants.DEFAULT_AKKA_FRAMESIZE) val akkaThroughput = configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT, @@ -87,11 +201,14 @@ object AkkaUtils { val logLifecycleEvents = if (lifecycleEvents) "on" else "off" + val logLevel = getLogLevel + val configString = s""" |akka { - | log-dead-letters = $logLifecycleEvents - | log-dead-letters-during-shutdown = $logLifecycleEvents + | actor { + | provider = "akka.remote.RemoteActorRefProvider" + | } | | remote { | startup-timeout = $startupTimeout @@ -108,110 +225,40 @@ object AkkaUtils { | threshold = $watchThreshold | } | - | netty{ - | tcp{ - | hostname = $host + | netty { + | tcp { + | transport-class = "akka.remote.transport.netty.NettyTransport" | port = $port | connection-timeout = $akkaTCPTimeout - | maximum-frame-size = ${akkaFramesize} + | maximum-frame-size = $akkaFramesize + | tcp-nodelay = on | } | } | | log-remote-lifecycle-events = $logLifecycleEvents | } - | - | actor{ - | default-dispatcher{ - | - | throughput = ${akkaThroughput} - | - | fork-join-executor { - | parallelism-factor = 2.0 - | } - | } - | } - | |} """.stripMargin - getDefaultActorSystemConfigString + configString - } - - def getLocalConfigString(configuration: Configuration): String = { - val akkaThroughput = configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT, - ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT) - val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS, - ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS) - - val logLifecycleEvents = if (lifecycleEvents) "on" else "off" - - val configString = - s""" - |akka { - | log-dead-letters = $logLifecycleEvents - | log-dead-letters-during-shutdown = $logLifecycleEvents - | - | actor{ - | default-dispatcher{ - | - | throughput = ${akkaThroughput} - | - | fork-join-executor { - | parallelism-factor = 2.0 - | } - | } - | } - | - |} - """.stripMargin - - getDefaultLocalActorSystemConfigString + configString - } - - def getDefaultActorSystemConfigString: String = { - val config = """ - |akka { - | actor { - | provider = "akka.remote.RemoteActorRefProvider" - | } - | - | remote{ - | netty{ - | tcp{ - | port = 0 - | transport-class = "akka.remote.transport.netty.NettyTransport" - | tcp-nodelay = on - | maximum-frame-size = 1MB - | execution-pool-size = 4 - | } - | } - | } - |} - """.stripMargin - - getDefaultLocalActorSystemConfigString + config - } + val hostnameConfigString = if(hostname != null && hostname.nonEmpty){ + s""" + |akka { + | remote { + | netty { + | tcp { + | hostname = $hostname + | } + | } + | } + |} + """.stripMargin + }else{ + // if hostname is null or empty, then leave hostname unspecified. Akka will pick + // InetAddress.getLocalHost.getHostAddress + "" + } - def getDefaultLocalActorSystemConfigString: String = { - val logLevel = getLogLevel - s""" - |akka { - | daemonic = on - | - | loggers = ["akka.event.slf4j.Slf4jLogger"] - | logger-startup-timeout = 30s - | loglevel = ${logLevel} - | stdout-loglevel = "WARNING" - | jvm-exit-on-fatal-error = off - | log-config-on-start = off - | - | serialize-messages = on - | - | debug { - | lifecycle = on - | } - |} - """.stripMargin + ConfigFactory.parseString(configString + hostnameConfigString) } def getLogLevel: String = { @@ -234,14 +281,6 @@ object AkkaUtils { } } - def getDefaultActorSystemConfig = { - ConfigFactory.parseString(getDefaultActorSystemConfigString) - } - - def getDefaultLocalActorSystemConfig = { - ConfigFactory.parseString(getDefaultLocalActorSystemConfigString) - } - def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem, timeout: FiniteDuration): ActorRef = { Await.result(system.actorSelection(parent.path / child).resolveOne()(timeout), timeout) @@ -265,6 +304,16 @@ object AkkaUtils { Await.result(future, timeout).asInstanceOf[T] } + /** + * Utility function to construct a future which tries multiple times to execute itself if it + * fails. If the maximum number of tries are exceeded, then the future fails. + * + * @param body function describing the future action + * @param tries number of maximum tries before the future fails + * @param executionContext which shall execute the future + * @tparam T return type of the future + * @return future which tries to recover by re-executing itself a given number of times + */ def retry[T](body: => T, tries: Int)(implicit executionContext: ExecutionContext): Future[T] = { Future{ body }.recoverWith{ case t:Throwable => @@ -276,11 +325,32 @@ object AkkaUtils { } } + /** + * Utility function to construct a future which tries multiple times to execute itself if it + * fails. If the maximum number of tries are exceeded, then the future fails. + * + * @param callable future action + * @param tries maximum number of tries before the future fails + * @param executionContext which shall execute the future + * @tparam T return type of the future + * @return future which tries to recover by re-executing itself a given number of times + */ def retry[T](callable: Callable[T], tries: Int)(implicit executionContext: ExecutionContext): Future[T] = { retry(callable.call(), tries) } + /** + * Utility function to construct a future which tries multiple times to execute itself if it + * fails. If the maximum number of tries are exceeded, then the future fails. + * + * @param target actor which receives the message + * @param message to be sent to the target actor + * @param tries maximum number of tries before the future fails + * @param executionContext which shall execute the future + * @param timeout of the future + * @return future which tries to receover by re-executing itself a given number of times + */ def retry(target: ActorRef, message: Any, tries: Int)(implicit executionContext: ExecutionContext, timeout: FiniteDuration): Future[Any] = { (target ? message)(timeout) recoverWith{ @@ -292,4 +362,17 @@ object AkkaUtils { } } } + + def getTimeout(config: Configuration): FiniteDuration = { + val duration = Duration(config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)) + + new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) + } + + def getDefaultTimeout: FiniteDuration = { + val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) + + new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index 195a0b6..676ddda 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -20,7 +20,6 @@ package org.apache.flink.runtime.client import java.io.IOException import java.net.InetSocketAddress -import java.util.concurrent.TimeUnit import akka.actor.Status.Failure import akka.actor._ @@ -35,45 +34,58 @@ import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, S import org.apache.flink.runtime.messages.JobManagerMessages._ import scala.concurrent.{TimeoutException, Await} -import scala.concurrent.duration.{FiniteDuration} +import scala.concurrent.duration.FiniteDuration - -class JobClient(jobManagerURL: String, timeout: FiniteDuration) - extends Actor with ActorLogMessages with ActorLogging{ +/** + * Actor which constitutes the bridge between the non-actor code and the JobManager. The JobClient + * is used to submit jobs to the JobManager and to request the port of the BlobManager. + * + * @param jobManagerURL Akka URL of the JobManager + * @param timeout Timeout used for futures + */ +class JobClient(jobManagerURL: String, timeout: FiniteDuration) extends +Actor with ActorLogMessages with ActorLogging { import context._ val jobManager = AkkaUtils.getReference(jobManagerURL)(system, timeout) override def receiveWithLogMessages: Receive = { case SubmitJobDetached(jobGraph) => - jobManager.tell(SubmitJob(jobGraph, registerForEvents = false, detach = true), sender) + jobManager forward SubmitJob(jobGraph, registerForEvents = false, detached = true) case cancelJob: CancelJob => jobManager forward cancelJob case SubmitJobAndWait(jobGraph, listen) => val listener = context.actorOf(Props(classOf[JobClientListener], sender)) - jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen, detach = false), listener) + jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen, detached = false), listener) case RequestBlobManagerPort => jobManager forward RequestBlobManagerPort - case RequestJobManagerStatus => { + case RequestJobManagerStatus => jobManager forward RequestJobManagerStatus - } } } -class JobClientListener(client: ActorRef) extends Actor with ActorLogMessages with ActorLogging { +/** + * Helper actor which listens to status messages from the JobManager and prints them on the + * standard output. Such an actor is started for each job, which is configured to listen to these + * status messages. + * + * @param jobSubmitter Akka URL of the sender of the job + */ +class JobClientListener(jobSubmitter: ActorRef) extends Actor with ActorLogMessages with +ActorLogging { override def receiveWithLogMessages: Receive = { case SubmissionFailure(_, t) => - client ! Failure(t) + jobSubmitter ! Failure(t) self ! PoisonPill case SubmissionSuccess(_) => case JobResultSuccess(_, duration, accumulatorResults) => - client ! new JobExecutionResult(duration, accumulatorResults) + jobSubmitter ! new JobExecutionResult(duration, accumulatorResults) self ! PoisonPill case JobResultCanceled(_, msg) => - client ! Failure(new JobExecutionException(msg, true)) + jobSubmitter ! Failure(new JobExecutionException(msg, true)) self ! PoisonPill case JobResultFailed(_, msg) => - client ! Failure(new JobExecutionException(msg, false)) + jobSubmitter ! Failure(new JobExecutionException(msg, false)) self ! PoisonPill case msg => // we have to use System.out.println here to avoid erroneous behavior for output redirection @@ -81,14 +93,20 @@ class JobClientListener(client: ActorRef) extends Actor with ActorLogMessages wi } } +/** + * JobClient's companion object containing convenience functions to start a JobClient actor, parse + * the configuration to extract the JobClient's settings and convenience functions to submit jobs. + */ object JobClient{ val JOB_CLIENT_NAME = "jobclient" - def startActorSystemAndActor(config: Configuration): (ActorSystem, ActorRef) = { - implicit val actorSystem = AkkaUtils.createActorSystem(host = "localhost", - port =0, configuration = config) + def startActorSystemAndActor(config: Configuration, localActorSystem: Boolean): + (ActorSystem, ActorRef) = { + // start a remote actor system to listen on an arbitrary port + implicit val actorSystem = AkkaUtils.createActorSystem(configuration = config, + listeningAddress = Some(("", 0))) - (actorSystem, startActorWithConfiguration(config)) + (actorSystem, startActorWithConfiguration(config, localActorSystem)) } def startActor(jobManagerURL: String)(implicit actorSystem: ActorSystem, timeout: FiniteDuration): @@ -96,66 +114,98 @@ object JobClient{ actorSystem.actorOf(Props(classOf[JobClient], jobManagerURL, timeout), JOB_CLIENT_NAME) } - def parseConfiguration(configuration: Configuration): String = { - configuration.getString(ConfigConstants.JOB_MANAGER_AKKA_URL, null) match { - case url: String => url - case _ => - val jobManagerAddress = configuration.getString(ConfigConstants + def startActorWithConfiguration(config: Configuration, localActorSystem: Boolean) + (implicit actorSystem: ActorSystem): ActorRef = { + implicit val timeout = AkkaUtils.getTimeout(config) + + startActor(parseConfiguration(config, localActorSystem)) + } + + /** + * Extracts the JobManager's Akka URL from the configuration. If localActorSystem is true, then + * the JobClient is executed in the same actor system as the JobManager. Thus, they can + * communicate locally. + * + * @param configuration Configuration object containing all user provided configuration values + * @param localActorSystem true if the JobClient runs in the same actor system as the JobManager, + * otherwise false + * @return Akka URL of the JobManager + */ + def parseConfiguration(configuration: Configuration, localActorSystem: Boolean): String = { + if(localActorSystem){ + // JobManager and JobClient run in the same ActorSystem + JobManager.getLocalAkkaURL + }else{ + val jobManagerAddress = configuration.getString(ConfigConstants .JOB_MANAGER_IPC_ADDRESS_KEY, null) - val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - if (jobManagerAddress == null) { - throw new RuntimeException("JobManager address has not been specified in the " + - "configuration.") - } + if (jobManagerAddress == null) { + throw new RuntimeException("JobManager address has not been specified in the " + + "configuration.") + } - JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) + JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort) } } - def startActorWithConfiguration(config: Configuration)(implicit actorSystem: ActorSystem): - ActorRef = { - implicit val timeout = FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) - - startActor(parseConfiguration(config)) - } - + /** + * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to + * the JobManager. The method blocks until the job has finished or the JobManager is no longer + * alive. In the former case, the [[JobExecutionResult]] is returned and in the latter case a + * [[JobExecutionException]] is thrown. + * + * @param jobGraph JobGraph describing the Flink job + * @param listenToStatusEvents true if the JobClient shall print status events of the + * corresponding job, otherwise false + * @param jobClient ActorRef to the JobClient + * @param timeout Timeout for futures + * @throws org.apache.flink.runtime.client.JobExecutionException + * @return The job execution result + */ @throws(classOf[JobExecutionException]) - def submitJobAndWait(jobGraph: JobGraph, listen: Boolean, jobClient: ActorRef) + def submitJobAndWait(jobGraph: JobGraph, listenToStatusEvents: Boolean, jobClient: ActorRef) (implicit timeout: FiniteDuration): JobExecutionResult = { + var waitForAnswer = true var answer: JobExecutionResult = null - val result = - (jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = listen))(AkkaUtils.INF_TIMEOUT). - mapTo[JobExecutionResult] + val result =(jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = listenToStatusEvents))( + AkkaUtils.INF_TIMEOUT).mapTo[JobExecutionResult] while(waitForAnswer) { try { answer = Await.result(result, timeout) waitForAnswer = false } catch { - case x: TimeoutException => { + case x: TimeoutException => val jmStatus = (jobClient ? RequestJobManagerStatus)(timeout).mapTo[JobManagerStatus] try { Await.result(jmStatus, timeout) } catch { - case t: Throwable => { + case t: Throwable => throw new JobExecutionException("JobManager not reachable anymore. Terminate " + "waiting for job answer.", false) - } } - } } } answer } - + /** + * Submits a job in detached mode. The method sends the corresponding [[JobGraph]] to the + * JobClient specified by jobClient. The JobClient does not start a [[JobClientListener]] and + * simply returns the [[SubmissionResponse]] of the [[JobManager]]. The SubmissionResponse is + * then returned by this method. + * + * @param jobGraph Flink job + * @param jobClient ActorRef to the JobClient + * @param timeout Tiemout for futures + * @return The submission response + */ def submitJobDetached(jobGraph: JobGraph, jobClient: ActorRef)(implicit timeout: FiniteDuration): SubmissionResponse = { val response = (jobClient ? SubmitJobDetached(jobGraph))(timeout) @@ -163,6 +213,17 @@ object JobClient{ Await.result(response.mapTo[SubmissionResponse],timeout) } + /** + * Uploads the specified jar files of the [[JobGraph]] jobGraph to the BlobServer of the + * JobManager. The respective port is retrieved from the JobManager. + * + * @param jobGraph Flink job containing the information about the required jars + * @param hostname Hostname of the instance on which the BlobServer and also the JobManager run + * @param jobClient ActorRef to the JobClient + * @param timeout Timeout for futures + * @throws java.io.IOException + * @return + */ @throws(classOf[IOException]) def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: ActorRef)(implicit timeout: FiniteDuration): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala index 8f5d4d4..128454a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala @@ -20,9 +20,18 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorRef -class JobInfo(val client: ActorRef,val start: Long){ +/** + * Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor + * submitted the job, when the start time and, if already terminated, the end time was. + * Additionally, it stores whether the job was started in the detached mode. Detached means that + * the submitting actor does not wait for the job result once the job has terminated. + * + * @param client Actor which submitted the job + * @param start Starting time + */ +class JobInfo(val client: ActorRef, val start: Long){ var end: Long = -1 - var detach: Boolean = false + var detached: Boolean = false def duration: Long = { if(end != -1){
