http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index f166c36..5e276bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.taskmanager; -import akka.actor.ActorRef; - import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -27,6 +25,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; @@ -72,25 +71,32 @@ public class RuntimeEnvironment implements Environment { private final ResultPartitionWriter[] writers; private final InputGate[] inputGates; - private final ActorRef jobManagerActor; + private final ActorGateway jobManager; private final AccumulatorRegistry accumulatorRegistry; // ------------------------------------------------------------------------ - public RuntimeEnvironment(JobID jobId, JobVertexID jobVertexId, ExecutionAttemptID executionId, - String taskName, String taskNameWithSubtasks, - int subtaskIndex, int parallelism, - Configuration jobConfiguration, Configuration taskConfiguration, - ClassLoader userCodeClassLoader, - MemoryManager memManager, IOManager ioManager, - BroadcastVariableManager bcVarManager, + public RuntimeEnvironment( + JobID jobId, + JobVertexID jobVertexId, + ExecutionAttemptID executionId, + String taskName, + String taskNameWithSubtasks, + int subtaskIndex, + int parallelism, + Configuration jobConfiguration, + Configuration taskConfiguration, + ClassLoader userCodeClassLoader, + MemoryManager memManager, + IOManager ioManager, + BroadcastVariableManager bcVarManager, AccumulatorRegistry accumulatorRegistry, - InputSplitProvider splitProvider, - Map<String, Future<Path>> distCacheEntries, - ResultPartitionWriter[] writers, - InputGate[] inputGates, - ActorRef jobManagerActor) { + InputSplitProvider splitProvider, + Map<String, Future<Path>> distCacheEntries, + ResultPartitionWriter[] writers, + InputGate[] inputGates, + ActorGateway jobManager) { checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism); @@ -112,7 +118,7 @@ public class RuntimeEnvironment implements Environment { this.distCacheEntries = checkNotNull(distCacheEntries); this.writers = checkNotNull(writers); this.inputGates = checkNotNull(inputGates); - this.jobManagerActor = checkNotNull(jobManagerActor); + this.jobManager = checkNotNull(jobManager); } @@ -238,6 +244,6 @@ public class RuntimeEnvironment implements Environment { } AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(jobId, executionId, checkpointId, serializedState); - jobManagerActor.tell(message, ActorRef.noSender()); + jobManager.tell(message); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 13a2ace..c4f62fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.taskmanager; -import akka.actor.ActorRef; -import akka.util.Timeout; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.cache.DistributedCache; @@ -37,6 +35,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; @@ -163,17 +162,17 @@ public class Task implements Runnable { private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById; - /** The TaskManager actor that spawned this task */ - private final ActorRef taskManager; + /** Gateway to the TaskManager that spawned this task */ + private final ActorGateway taskManager; - /** The JobManager actor */ - private final ActorRef jobManager; + /** Gateway to the JobManager */ + private final ActorGateway jobManager; /** All actors that want to be notified about changes in the task's execution state */ - private final List<ActorRef> executionListenerActors; + private final List<ActorGateway> executionListenerActors; /** The timeout for all ask operations on actors */ - private final Timeout actorAskTimeout; + private final FiniteDuration actorAskTimeout; /** The library cache, from which the task can request its required JAR files */ private final LibraryCacheManager libraryCache; @@ -224,8 +223,8 @@ public class Task implements Runnable { IOManager ioManager, NetworkEnvironment networkEnvironment, BroadcastVariableManager bcVarManager, - ActorRef taskManagerActor, - ActorRef jobManagerActor, + ActorGateway taskManagerActor, + ActorGateway jobManagerActor, FiniteDuration actorAskTimeout, LibraryCacheManager libraryCache, FileCache fileCache) @@ -254,13 +253,13 @@ public class Task implements Runnable { this.jobManager = checkNotNull(jobManagerActor); this.taskManager = checkNotNull(taskManagerActor); - this.actorAskTimeout = new Timeout(checkNotNull(actorAskTimeout)); + this.actorAskTimeout = checkNotNull(actorAskTimeout); this.libraryCache = checkNotNull(libraryCache); this.fileCache = checkNotNull(fileCache); this.network = checkNotNull(networkEnvironment); - this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>(); + this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>(); // create the reader and writer structures @@ -568,7 +567,7 @@ public class Task implements Runnable { // to know this! notifyObservers(ExecutionState.RUNNING, null); taskManager.tell(new TaskMessages.UpdateTaskExecutionState( - new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)), ActorRef.noSender()); + new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING))); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); @@ -750,11 +749,11 @@ public class Task implements Runnable { } private void notifyFinalState() { - taskManager.tell(new TaskInFinalState(executionId), ActorRef.noSender()); + taskManager.tell(new TaskInFinalState(executionId)); } private void notifyFatalError(String message, Throwable cause) { - taskManager.tell(new FatalError(message, cause), ActorRef.noSender()); + taskManager.tell(new FatalError(message, cause)); } // ---------------------------------------------------------------------------------------------------------------- @@ -839,14 +838,10 @@ public class Task implements Runnable { // State Listeners // ------------------------------------------------------------------------ - public void registerExecutionListener(ActorRef listener) { + public void registerExecutionListener(ActorGateway listener) { executionListenerActors.add(listener); } - public void unregisterExecutionListener(ActorRef listener) { - executionListenerActors.remove(listener); - } - private void notifyObservers(ExecutionState newState, Throwable error) { if (error == null) { LOG.info(taskNameWithSubtask + " switched to " + newState); @@ -859,8 +854,8 @@ public class Task implements Runnable { TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(stateUpdate); - for (ActorRef listener : executionListenerActors) { - listener.tell(actorMessage, ActorRef.noSender()); + for (ActorGateway listener : executionListenerActors) { + listener.tell(actorMessage); } } http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java index 5a69850..cb78c16 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java @@ -18,13 +18,10 @@ package org.apache.flink.runtime.taskmanager; -import akka.actor.ActorRef; - -import akka.pattern.Patterns; -import akka.util.Timeout; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -32,10 +29,11 @@ import org.apache.flink.util.InstantiationUtil; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; public class TaskInputSplitProvider implements InputSplitProvider { - private final ActorRef jobManager; + private final ActorGateway jobManager; private final JobID jobId; @@ -45,11 +43,15 @@ public class TaskInputSplitProvider implements InputSplitProvider { private final ClassLoader usercodeClassLoader; - private final Timeout timeout; + private final FiniteDuration timeout; - public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId, - ExecutionAttemptID executionID, ClassLoader userCodeClassLoader, - Timeout timeout) + public TaskInputSplitProvider( + ActorGateway jobManager, + JobID jobId, + JobVertexID vertexId, + ExecutionAttemptID executionID, + ClassLoader userCodeClassLoader, + FiniteDuration timeout) { this.jobManager = jobManager; this.jobId = jobId; @@ -62,11 +64,11 @@ public class TaskInputSplitProvider implements InputSplitProvider { @Override public InputSplit getNextInputSplit() { try { - final Future<Object> response = Patterns.ask(jobManager, + final Future<Object> response = jobManager.ask( new JobManagerMessages.RequestNextInputSplit(jobId, vertexId, executionID), timeout); - final Object result = Await.result(response, timeout.duration()); + final Object result = Await.result(response, timeout); if(!(result instanceof JobManagerMessages.NextInputSplit)){ throw new RuntimeException("RequestNextInputSplit requires a response of type " + http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala deleted file mode 100644 index c74c339..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime - -import _root_.akka.actor.Actor - -/** - * Mixin to add debug message logging - */ -trait ActorLogMessages { - that: Actor with ActorSynchronousLogging => - - override def receive: Receive = new Actor.Receive { - private val _receiveWithLogMessages = receiveWithLogMessages - - override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x) - - override def apply(x: Any): Unit = { - if (!log.isDebugEnabled) { - _receiveWithLogMessages(x) - } - else { - log.debug(s"Received message $x at ${that.self.path} from ${that.sender()}.") - - val start = System.nanoTime() - - _receiveWithLogMessages(x) - - val duration = (System.nanoTime() - start) / 1000000 - log.debug(s"Handled message $x in $duration ms from ${that.sender()}.") - } - } - } - - def receiveWithLogMessages: Receive -} http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala deleted file mode 100644 index 4d3a988..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorSynchronousLogging.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime - -import _root_.akka.actor.Actor -import grizzled.slf4j.Logger - -/** Adds a logger to an [[akka.actor.Actor]] implementation - * - */ -trait ActorSynchronousLogging { - self: Actor => - - lazy val log = Logger(getClass) -} http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala new file mode 100644 index 0000000..74a9b07 --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime + +import _root_.akka.actor.Actor +import grizzled.slf4j.Logger + +/** Base trait for Flink's actors. + * + * The message handling logic is defined in the handleMessage method. This allows to mixin + * stackable traits which change the message receiving behaviour. + */ +trait FlinkActor extends Actor { + val log: Logger + + override def receive: Receive = handleMessage + + /** Handle incoming messages + * + * @return + */ + def handleMessage: Receive + + /** Factory method for messages. This method can be used by mixins to decorate messages + * + * @param message The message to decorate + * @return The decorated message + */ + def decorateMessage(message: Any): Any = { + message + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala new file mode 100644 index 0000000..c6793ed --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageDecorator.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime + +import java.util.UUID + +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage +import org.apache.flink.runtime.messages.RequiresLeaderSessionID + +/** [[MessageDecorator]] which wraps [[RequiresLeaderSessionID]] messages in a + * [[LeaderSessionMessage]] with the given leader session ID. + * + * @param leaderSessionID Leader session ID which is associated with the + * [[RequiresLeaderSessionID]] message + */ +class LeaderSessionMessageDecorator(val leaderSessionID: Option[UUID]) extends MessageDecorator { + + /** Wraps [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]]. + * + * @param message Message to decorate + * @return Decorated message + */ + override def decorate(message: Any): Any = { + message match { + case msg: RequiresLeaderSessionID => + LeaderSessionMessage(leaderSessionID, msg) + case msg => msg + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala new file mode 100644 index 0000000..d54926d --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessages.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime + +import java.util.UUID + +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage +import org.apache.flink.runtime.messages.RequiresLeaderSessionID + +/** Mixin to filter out [[LeaderSessionMessage]] which contain an invalid leader session id. + * Messages which contain a valid leader session ID are unwrapped and forwarded to the actor. + * + */ +trait LeaderSessionMessages extends FlinkActor { + protected def leaderSessionID: Option[UUID] + + abstract override def receive: Receive = { + case LeaderSessionMessage(id, msg) => + // Filter out messages which have not the correct leader session ID + (leaderSessionID, id) match { + case (Some(currentID), Some(msgID)) => + if(currentID.equals(msgID)) { + // correct leader session ID + super.receive(msg) + } else { + // discard message because of incorrect leader session ID + handleDiscardedMessage(msg) + } + + case _ => handleDiscardedMessage(msg) + } + case msg: RequiresLeaderSessionID => + throw new Exception(s"Received a message $msg without a leader session ID, even though" + + " it requires to have one.") + case msg => + // pass the message to the parent's receive method for further processing + super.receive(msg) + } + + private def handleDiscardedMessage(msg: Any): Unit = { + log.debug(s"Discard message $msg because the leader session ID was not correct.") + } + + /** Wrap [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]] + * + * @param message The message to decorate + * @return The decorated message + */ + override def decorateMessage(message: Any): Any = { + message match { + case msg: RequiresLeaderSessionID => + LeaderSessionMessage(leaderSessionID, super.decorateMessage(msg)) + + case msg => super.decorateMessage(msg) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/LogMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LogMessages.scala new file mode 100644 index 0000000..e1e8961 --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/LogMessages.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime + +/** Mixin to add message logging if the debug log level is activated + * + */ +trait LogMessages extends FlinkActor { + abstract override def receive: Receive = { + val _receive = super.receive + + new Receive { + override def isDefinedAt(x: Any): Boolean = _receive.isDefinedAt(x) + + override def apply(x: Any): Unit = { + if (!log.isDebugEnabled) { + _receive(x) + } + else { + log.debug(s"Received message $x at ${context.self.path} from ${context.sender()}.") + + val start = System.nanoTime() + + _receive(x) + + val duration = (System.nanoTime() - start) / 1000000 + log.debug(s"Handled message $x in $duration ms from ${context.sender()}.") + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala new file mode 100644 index 0000000..5b1700f --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/MessageDecorator.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime + +/** Base trait for message decorators + * + */ +trait MessageDecorator { + + /** Decorates a message + * + * @param message Message to decorate + * @return Decorated message + */ + def decorate(message: Any): Any +} http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index d38e503..b8cce41 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -62,8 +62,10 @@ object AkkaUtils { * parameter is None, then a local actor system will be created. * @return created actor system */ - def createActorSystem(configuration: Configuration, - listeningAddress: Option[(String, Int)]): ActorSystem = { + def createActorSystem( + configuration: Configuration, + listeningAddress: Option[(String, Int)]) + : ActorSystem = { val akkaConfig = getAkkaConfig(configuration, listeningAddress) createActorSystem(akkaConfig) } http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4325e41..7bf4447 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} import java.lang.reflect.{InvocationTargetException, Constructor} import java.net.InetSocketAddress -import java.util.Collections +import java.util.{UUID, Collections} import akka.actor.Status.{Failure, Success} import akka.actor._ @@ -51,10 +51,11 @@ import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.ZooKeeperUtil import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation} import org.apache.flink.runtime.webmonitor.WebMonitor -import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages} +import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessages} +import org.apache.flink.runtime.{LogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway, InstanceManager} import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus} import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.messages.JobManagerMessages._ @@ -62,6 +63,7 @@ import org.apache.flink.runtime.messages.RegistrationMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat} import org.apache.flink.util.{ExceptionUtils, InstantiationUtil} +import _root_.akka.pattern.ask import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.forkjoin.ForkJoinPool @@ -107,11 +109,17 @@ class JobManager( protected val delayBetweenRetries: Long, protected val timeout: FiniteDuration, protected val mode: StreamingMode) - extends Actor with ActorLogMessages with ActorSynchronousLogging { + extends FlinkActor + with LeaderSessionMessages // order of the mixin is important, we want filtering after logging + with LogMessages // order of the mixin is important, we want first logging + { + + override val log = Logger(getClass) /** List of current jobs running jobs */ protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() + override val leaderSessionID = Some(UUID.randomUUID()) /** * Run when the job manager is started. Simply logs an informational message. @@ -125,10 +133,10 @@ class JobManager( // disconnect the registered task managers instanceManager.getAllRegisteredInstances.asScala.foreach { - _.getInstanceGateway().tell(Disconnect("JobManager is shutting down")) + _.getActorGateway().tell(Disconnect("JobManager is shutting down")) } - archive ! PoisonPill + archive ! decorateMessage(PoisonPill) for((e,_) <- currentJobs.values) { e.fail(new Exception("The JobManager is shutting down.")) @@ -151,26 +159,48 @@ class JobManager( * * @return */ - override def receiveWithLogMessages: Receive = { + override def handleMessage: Receive = { - case RegisterTaskManager(taskManager, connectionInfo, hardwareInformation, numberOfSlots) => + case RegisterTaskManager( + registrationSessionID, + taskManager, + connectionInfo, + hardwareInformation, + numberOfSlots) => if (instanceManager.isRegistered(taskManager)) { val instanceID = instanceManager.getRegisteredInstance(taskManager).getId // IMPORTANT: Send the response to the "sender", which is not the // TaskManager actor, but the ask future! - sender() ! AlreadyRegistered(self, instanceID, libraryCacheManager.getBlobServerPort) + sender() ! decorateMessage( + AlreadyRegistered( + registrationSessionID, + leaderSessionID.get, + self, + instanceID, + libraryCacheManager.getBlobServerPort) + ) } else { try { - val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo, - hardwareInformation, numberOfSlots) + val instanceID = instanceManager.registerTaskManager( + taskManager, + connectionInfo, + hardwareInformation, + numberOfSlots, + leaderSessionID) // IMPORTANT: Send the response to the "sender", which is not the // TaskManager actor, but the ask future! - sender() ! AcknowledgeRegistration(self, instanceID, - libraryCacheManager.getBlobServerPort) + sender() ! decorateMessage( + AcknowledgeRegistration( + registrationSessionID, + leaderSessionID.get, + self, + instanceID, + libraryCacheManager.getBlobServerPort) + ) // to be notified when the taskManager is no longer reachable context.watch(taskManager) @@ -183,15 +213,19 @@ class JobManager( // IMPORTANT: Send the response to the "sender", which is not the // TaskManager actor, but the ask future! - sender() ! RefuseRegistration(ExceptionUtils.stringifyException(e)) + sender() ! decorateMessage( + RefuseRegistration( + registrationSessionID, + ExceptionUtils.stringifyException(e)) + ) } } case RequestNumberRegisteredTaskManager => - sender ! instanceManager.getNumberOfRegisteredTaskManagers + sender ! decorateMessage(instanceManager.getNumberOfRegisteredTaskManagers) case RequestTotalNumberOfSlots => - sender ! instanceManager.getTotalNumberOfSlots + sender ! decorateMessage(instanceManager.getTotalNumberOfSlots) case SubmitJob(jobGraph, listen) => submitJob(jobGraph, listenToEvents = listen) @@ -206,16 +240,19 @@ class JobManager( executionGraph.cancel() }(context.dispatcher) - sender ! CancellationSuccess(jobID) + sender ! decorateMessage(CancellationSuccess(jobID)) case None => log.info(s"No job found with ID $jobID.") - sender ! CancellationFailure(jobID, new IllegalArgumentException("No job found with " + - s"ID $jobID.")) + sender ! decorateMessage( + CancellationFailure( + jobID, + new IllegalArgumentException(s"No job found with ID $jobID.")) + ) } case UpdateTaskExecutionState(taskExecutionState) => if (taskExecutionState == null) { - sender ! false + sender ! decorateMessage(false) } else { currentJobs.get(taskExecutionState.getJobID) match { case Some((executionGraph, _)) => @@ -223,13 +260,13 @@ class JobManager( Future { val result = executionGraph.updateState(taskExecutionState) - originalSender ! result + originalSender ! decorateMessage(result) }(context.dispatcher) case None => log.error("Cannot find execution graph for ID " + s"${taskExecutionState.getJobID} to change state to " + s"${taskExecutionState.getExecutionState}.") - sender ! false + sender ! decorateMessage(false) } } @@ -283,7 +320,7 @@ class JobManager( null } - sender ! NextInputSplit(serializedInputSplit) + sender ! decorateMessage(NextInputSplit(serializedInputSplit)) case checkpointMessage : AbstractCheckpointMessage => handleCheckpointMessage(checkpointMessage) @@ -310,20 +347,31 @@ class JobManager( } val result = new SerializedJobExecutionResult(jobID, jobInfo.duration, accumulatorResults) - jobInfo.client ! JobResultSuccess(result) + jobInfo.client ! decorateMessage(JobResultSuccess(result)) case JobStatus.CANCELED => - jobInfo.client ! Failure(new JobCancellationException(jobID, - "Job was cancelled.", error)) + jobInfo.client ! decorateMessage( + Failure( + new JobCancellationException( + jobID, + "Job was cancelled.", error) + ) + ) case JobStatus.FAILED => - jobInfo.client ! Failure(new JobExecutionException(jobID, - "Job execution failed.", error)) + jobInfo.client ! decorateMessage( + Failure( + new JobExecutionException( + jobID, + "Job execution failed.", + error) + ) + ) case x => val exception = new JobExecutionException(jobID, s"$x is not a " + "terminal state.") - jobInfo.client ! Failure(exception) + jobInfo.client ! decorateMessage(Failure(exception)) throw exception } @@ -337,13 +385,17 @@ class JobManager( case ScheduleOrUpdateConsumers(jobId, partitionId) => currentJobs.get(jobId) match { case Some((executionGraph, _)) => - sender ! Acknowledge + sender ! decorateMessage(Acknowledge) executionGraph.scheduleOrUpdateConsumers(partitionId) case None => log.error(s"Cannot find execution graph for job ID $jobId to schedule or update " + s"consumers.") - sender ! Failure(new IllegalStateException("Cannot find execution graph for job ID " + - s"$jobId to schedule or update consumers.")) + sender ! decorateMessage( + Failure( + new IllegalStateException("Cannot find execution graph for job ID " + + s"$jobId to schedule or update consumers.") + ) + ) } case RequestPartitionState(jobId, partitionId, taskExecutionId, taskResultId) => @@ -360,15 +412,21 @@ class JobManager( null } - sender ! PartitionState( - taskExecutionId, taskResultId, partitionId.getPartitionId, state) + sender ! decorateMessage( + PartitionState( + taskExecutionId, + taskResultId, + partitionId.getPartitionId, + state) + ) case RequestJobStatus(jobID) => currentJobs.get(jobID) match { - case Some((executionGraph,_)) => sender ! CurrentJobStatus(jobID, executionGraph.getState) + case Some((executionGraph,_)) => + sender ! decorateMessage(CurrentJobStatus(jobID, executionGraph.getState)) case None => // check the archive - archive forward RequestJobStatus(jobID) + archive forward decorateMessage(RequestJobStatus(jobID)) } case RequestRunningJobs => @@ -376,16 +434,21 @@ class JobManager( case (_, (eg, jobInfo)) => eg } - sender ! RunningJobs(executionGraphs) + sender ! decorateMessage(RunningJobs(executionGraphs)) case RequestRunningJobsStatus => try { val jobs = currentJobs map { - case (_, (eg, _)) => new JobStatusMessage(eg.getJobID, eg.getJobName, - eg.getState, eg.getStatusTimestamp(JobStatus.CREATED)) + case (_, (eg, _)) => + new JobStatusMessage( + eg.getJobID, + eg.getJobName, + eg.getState, + eg.getStatusTimestamp(JobStatus.CREATED) + ) } - sender ! RunningJobsStatus(jobs) + sender ! decorateMessage(RunningJobsStatus(jobs)) } catch { case t: Throwable => log.error("Exception while responding to RequestRunningJobsStatus", t) @@ -393,18 +456,22 @@ class JobManager( case RequestJob(jobID) => currentJobs.get(jobID) match { - case Some((eg, _)) => sender ! JobFound(jobID, eg) + case Some((eg, _)) => sender ! decorateMessage(JobFound(jobID, eg)) case None => // check the archive - archive forward RequestJob(jobID) + archive forward decorateMessage(RequestJob(jobID)) } case RequestBlobManagerPort => - sender ! libraryCacheManager.getBlobServerPort + sender ! decorateMessage(libraryCacheManager.getBlobServerPort) case RequestRegisteredTaskManagers => import scala.collection.JavaConverters._ - sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala) + sender ! decorateMessage( + RegisteredTaskManagers( + instanceManager.getAllRegisteredInstances.asScala + ) + ) case Heartbeat(instanceID, metricsReport, accumulators) => log.debug(s"Received hearbeat message from $instanceID.") @@ -420,8 +487,8 @@ class JobManager( case message: InfoMessage => handleInfoRequestMessage(message, sender()) case RequestStackTrace(instanceID) => - val gateway = instanceManager.getRegisteredInstanceById(instanceID).getInstanceGateway - gateway.forward(SendStackTrace, sender()) + val gateway = instanceManager.getRegisteredInstanceById(instanceID).getActorGateway + gateway.forward(SendStackTrace, new AkkaActorGateway(sender(), leaderSessionID)) case Terminated(taskManager) => if (instanceManager.isRegistered(taskManager)) { @@ -432,7 +499,7 @@ class JobManager( } case RequestJobManagerStatus => - sender() ! JobManagerStatusAlive + sender() ! decorateMessage(JobManagerStatusAlive) case Disconnect(msg) => val taskManager = sender() @@ -443,6 +510,9 @@ class JobManager( instanceManager.unregisterTaskManager(taskManager) context.unwatch(taskManager) } + + case RequestLeaderSessionID => + sender() ! ResponseLeaderSessionID(leaderSessionID) } /** @@ -456,7 +526,11 @@ class JobManager( */ private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean): Unit = { if (jobGraph == null) { - sender ! Failure(new JobSubmissionException(null, "JobGraph must not be null.")) + sender ! decorateMessage( + Failure( + new JobSubmissionException(null, "JobGraph must not be null.") + ) + ) } else { val jobId = jobGraph.getJobID @@ -499,7 +573,8 @@ class JobManager( timeout, jobGraph.getUserJarBlobKeys, userCodeLoader), - JobInfo(sender(), System.currentTimeMillis())) + JobInfo(sender(), System.currentTimeMillis()) + ) )._1 // configure the execution graph @@ -577,23 +652,29 @@ class JobManager( val confirmVertices: java.util.List[ExecutionJobVertex] = snapshotSettings.getVerticesToConfirm.asScala.map(idToVertex).asJava - executionGraph.enableSnaphotCheckpointing( - snapshotSettings.getCheckpointInterval, snapshotSettings.getCheckpointTimeout, - triggerVertices, ackVertices, confirmVertices, - context.system) + executionGraph.enableSnapshotCheckpointing( + snapshotSettings.getCheckpointInterval, + snapshotSettings.getCheckpointTimeout, + triggerVertices, + ackVertices, + confirmVertices, + context.system, + leaderSessionID) } // get notified about job status changes - executionGraph.registerJobStatusListener(self) + executionGraph.registerJobStatusListener(new AkkaActorGateway(self, leaderSessionID)) if (listenToEvents) { // the sender wants to be notified about state changes - executionGraph.registerExecutionListener(sender()) - executionGraph.registerJobStatusListener(sender()) + val gateway = new AkkaActorGateway(sender(), leaderSessionID) + + executionGraph.registerExecutionListener(gateway) + executionGraph.registerJobStatusListener(gateway) } // done with submitting the job - sender ! Success(jobGraph.getJobID) + sender ! decorateMessage(Success(jobGraph.getJobID)) } catch { case t: Throwable => @@ -612,7 +693,7 @@ class JobManager( new JobExecutionException(jobId, s"Failed to submit job ${jobId} (${jobName})", t) } - sender ! Failure(rt) + sender ! decorateMessage(Failure(rt)) return } @@ -690,14 +771,14 @@ class JobManager( currentJobs.get(jobID) match { case Some((graph, jobInfo)) => val accumulatorValues = graph.getAccumulatorsSerialized() - sender() ! AccumulatorResultsFound(jobID, accumulatorValues) + sender() ! decorateMessage(AccumulatorResultsFound(jobID, accumulatorValues)) case None => archive.forward(message) } } catch { case e: Exception => log.error("Cannot serialize accumulator result.", e) - sender() ! AccumulatorResultsErroneous(jobID, e) + sender() ! decorateMessage(AccumulatorResultsErroneous(jobID, e)) } case RequestAccumulatorResultsStringified(jobId) => @@ -830,7 +911,7 @@ class JobManager( try { eg.prepareForArchiving() - archive ! ArchiveExecutionGraph(jobID, eg) + archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg)) } catch { case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " + "archiving.", t) @@ -967,11 +1048,13 @@ object JobManager { * @param listeningAddress The hostname where the JobManager should listen for messages. * @param listeningPort The port where the JobManager should listen for messages. */ - def runJobManager(configuration: Configuration, - executionMode: JobManagerMode, - streamingMode: StreamingMode, - listeningAddress: String, - listeningPort: Int) : Unit = { + def runJobManager( + configuration: Configuration, + executionMode: JobManagerMode, + streamingMode: StreamingMode, + listeningAddress: String, + listeningPort: Int) + : Unit = { LOG.info("Starting JobManager") @@ -979,8 +1062,10 @@ object JobManager { LOG.info(s"Starting JobManager actor system at $listeningAddress:$listeningPort.") val jobManagerSystem = try { - val akkaConfig = AkkaUtils.getAkkaConfig(configuration, - Some((listeningAddress, listeningPort))) + val akkaConfig = AkkaUtils.getAkkaConfig( + configuration, + Some((listeningAddress, listeningPort)) + ) if (LOG.isDebugEnabled) { LOG.debug("Using akka configuration\n " + akkaConfig) } @@ -1003,14 +1088,20 @@ object JobManager { try { // bring up the job manager actor LOG.info("Starting JobManager actor") - val (jobManager, archiver) = startJobManagerActors(configuration, - jobManagerSystem, streamingMode) + val (jobManager, archiver) = startJobManagerActors( + configuration, + jobManagerSystem, + streamingMode) // start a process reaper that watches the JobManager. If the JobManager actor dies, // the process reaper will kill the JVM process (to ensure easy failure detection) LOG.debug("Starting JobManager process reaper") jobManagerSystem.actorOf( - Props(classOf[ProcessReaper], jobManager, LOG.logger, RUNTIME_FAILURE_RETURN_CODE), + Props( + classOf[ProcessReaper], + jobManager, + LOG.logger, + RUNTIME_FAILURE_RETURN_CODE), "JobManager_Process_Reaper") // bring up a local task manager, if needed @@ -1018,16 +1109,22 @@ object JobManager { LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode") val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor( - configuration, jobManagerSystem, - listeningAddress, - Some(TaskManager.TASK_MANAGER_NAME), - Some(jobManager.path.toString), - true, streamingMode, - classOf[TaskManager]) + configuration, + jobManagerSystem, + listeningAddress, + Some(TaskManager.TASK_MANAGER_NAME), + Some(jobManager.path.toString), + true, + streamingMode, + classOf[TaskManager]) LOG.debug("Starting TaskManager process reaper") jobManagerSystem.actorOf( - Props(classOf[ProcessReaper], taskManagerActor, LOG.logger, RUNTIME_FAILURE_RETURN_CODE), + Props( + classOf[ProcessReaper], + taskManagerActor, + LOG.logger, + RUNTIME_FAILURE_RETURN_CODE), "TaskManager_Process_Reaper") } @@ -1257,13 +1354,18 @@ object JobManager { * @param actorSystem Teh actor system running the JobManager * @return A tuple of references (JobManager Ref, Archiver Ref) */ - def startJobManagerActors(configuration: Configuration, - actorSystem: ActorSystem, - streamingMode: StreamingMode): (ActorRef, ActorRef) = { + def startJobManagerActors( + configuration: Configuration, + actorSystem: ActorSystem, + streamingMode: StreamingMode) + : (ActorRef, ActorRef) = { - startJobManagerActors(configuration, actorSystem, - Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME), - streamingMode) + startJobManagerActors( + configuration, + actorSystem, + Some(JOB_MANAGER_NAME), + Some(ARCHIVE_NAME), + streamingMode) } /** * Starts the JobManager and job archiver based on the given configuration, in the @@ -1279,11 +1381,13 @@ object JobManager { * * @return A tuple of references (JobManager Ref, Archiver Ref) */ - def startJobManagerActors(configuration: Configuration, - actorSystem: ActorSystem, - jobMangerActorName: Option[String], - archiverActorName: Option[String], - streamingMode: StreamingMode): (ActorRef, ActorRef) = { + def startJobManagerActors( + configuration: Configuration, + actorSystem: ActorSystem, + jobMangerActorName: Option[String], + archiverActorName: Option[String], + streamingMode: StreamingMode) + : (ActorRef, ActorRef) = { val (executionContext, instanceManager, @@ -1352,9 +1456,11 @@ object JobManager { "akka://flink/user/" + JOB_MANAGER_NAME } - def getJobManagerRemoteReferenceFuture(address: InetSocketAddress, - system: ActorSystem, - timeout: FiniteDuration): Future[ActorRef] = { + def getJobManagerRemoteReferenceFuture( + address: InetSocketAddress, + system: ActorSystem, + timeout: FiniteDuration) + : Future[ActorRef] = { AkkaUtils.getReference(getRemoteJobManagerAkkaURL(address), system, timeout) } @@ -1369,9 +1475,12 @@ object JobManager { * @return The ActorRef to the JobManager */ @throws(classOf[IOException]) - def getJobManagerRemoteReference(jobManagerUrl: String, - system: ActorSystem, - timeout: FiniteDuration): ActorRef = { + def getJobManagerRemoteReference( + jobManagerUrl: String, + system: ActorSystem, + timeout: FiniteDuration) + : ActorRef = { + try { val future = AkkaUtils.getReference(jobManagerUrl, system, timeout) Await.result(future, timeout) @@ -1397,9 +1506,11 @@ object JobManager { * @return The ActorRef to the JobManager */ @throws(classOf[IOException]) - def getJobManagerRemoteReference(address: InetSocketAddress, - system: ActorSystem, - timeout: FiniteDuration): ActorRef = { + def getJobManagerRemoteReference( + address: InetSocketAddress, + system: ActorSystem, + timeout: FiniteDuration) + : ActorRef = { val jmAddress = getRemoteJobManagerAkkaURL(address) getJobManagerRemoteReference(jmAddress, system, timeout) @@ -1415,14 +1526,38 @@ object JobManager { * @return The ActorRef to the JobManager */ @throws(classOf[IOException]) - def getJobManagerRemoteReference(address: InetSocketAddress, - system: ActorSystem, - config: Configuration): ActorRef = { + def getJobManagerRemoteReference( + address: InetSocketAddress, + system: ActorSystem, + config: Configuration) + : ActorRef = { val timeout = AkkaUtils.getLookupTimeout(config) getJobManagerRemoteReference(address, system, timeout) } + /** Returns the [[ActorGateway]] for the provided JobManager. The function automatically + * retrieves the current leader session ID from the JobManager and instantiates the + * [[AkkaActorGateway]] with it. + * + * @param jobManager ActorRef to the [[JobManager]] + * @param timeout Timeout for the blocking leader session ID retrieval + * @throws java.lang.Exception + * @return Gateway to the specified JobManager + */ + @throws(classOf[Exception]) + def getJobManagerGateway( + jobManager: ActorRef, + timeout: FiniteDuration + ): ActorGateway = { + val futureLeaderSessionID = (jobManager ? RequestLeaderSessionID)(timeout) + .mapTo[ResponseLeaderSessionID] + + val leaderSessionID = Await.result(futureLeaderSessionID, timeout).leaderSessionID + + new AkkaActorGateway(jobManager, leaderSessionID) + } + // -------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 7572e72..9f228ed 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -20,13 +20,12 @@ package org.apache.flink.runtime.jobmanager import java.util -import akka.actor.Actor - +import grizzled.slf4j.Logger import org.apache.flink.api.common.JobID import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.messages.accumulators._ +import org.apache.flink.runtime.{FlinkActor, LogMessages} import org.apache.flink.runtime.messages.webmonitor._ -import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.executiongraph.ExecutionGraph import org.apache.flink.runtime.messages.ArchiveMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._ @@ -56,9 +55,11 @@ import scala.collection.mutable * @param max_entries Maximum number of stored Flink jobs */ class MemoryArchivist(private val max_entries: Int) - extends Actor - with ActorLogMessages - with ActorSynchronousLogging { + extends FlinkActor + with LogMessages { + + override val log = Logger(getClass) + /* * Map of execution graphs belonging to recently started jobs with the time stamp of the last * received job event. The insert order is preserved through a LinkedHashMap. @@ -70,41 +71,45 @@ class MemoryArchivist(private val max_entries: Int) var canceledCnt: Int = 0 var failedCnt: Int = 0 - override def receiveWithLogMessages: Receive = { + override def handleMessage: Receive = { /* Receive Execution Graph to archive */ case ArchiveExecutionGraph(jobID, graph) => // wrap graph inside a soft reference graphs.update(jobID, graph) + // update job counters graph.getState match { case JobStatus.FINISHED => finishedCnt += 1 case JobStatus.CANCELED => canceledCnt += 1 case JobStatus.FAILED => failedCnt += 1 + // ignore transitional states, e.g. Cancelling, Running, Failing, etc. + case _ => } + trimHistory() case RequestArchivedJob(jobID: JobID) => val graph = graphs.get(jobID) - sender ! ArchivedJob(graph) + sender ! decorateMessage(ArchivedJob(graph)) case RequestArchivedJobs => - sender ! ArchivedJobs(graphs.values) + sender ! decorateMessage(ArchivedJobs(graphs.values)) case RequestJob(jobID) => graphs.get(jobID) match { - case Some(graph) => sender ! JobFound(jobID, graph) - case None => sender ! JobNotFound(jobID) + case Some(graph) => sender ! decorateMessage(JobFound(jobID, graph)) + case None => sender ! decorateMessage(JobNotFound(jobID)) } case RequestJobStatus(jobID) => graphs.get(jobID) match { - case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState) - case None => sender ! JobNotFound(jobID) + case Some(graph) => sender ! decorateMessage(CurrentJobStatus(jobID, graph.getState)) + case None => sender ! decorateMessage(JobNotFound(jobID)) } case RequestJobCounts => - sender ! (finishedCnt, canceledCnt, failedCnt) + sender ! decorateMessage((finishedCnt, canceledCnt, failedCnt)) case _ : RequestJobsOverview => try { http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala index 6bc59a2..0cb3b0d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala @@ -45,11 +45,18 @@ object ExecutionGraphMessages { * @param timestamp of the execution state change * @param optionalMessage */ - case class ExecutionStateChanged(jobID: JobID, vertexID: JobVertexID, - taskName: String, totalNumberOfSubTasks: Int, subtaskIndex: Int, - executionID: ExecutionAttemptID, - newExecutionState: ExecutionState, timestamp: Long, - optionalMessage: String){ + case class ExecutionStateChanged( + jobID: JobID, + vertexID: JobVertexID, + taskName: String, + totalNumberOfSubTasks: Int, + subtaskIndex: Int, + executionID: ExecutionAttemptID, + newExecutionState: ExecutionState, + timestamp: Long, + optionalMessage: String) + extends RequiresLeaderSessionID { + override def toString: String = { val oMsg = if (optionalMessage != null) { s"\n$optionalMessage" @@ -69,8 +76,12 @@ object ExecutionGraphMessages { * @param timestamp * @param error */ - case class JobStatusChanged(jobID: JobID, newJobStatus: JobStatus, timestamp: Long, - error: Throwable){ + case class JobStatusChanged( + jobID: JobID, + newJobStatus: JobStatus, + timestamp: Long, + error: Throwable) + extends RequiresLeaderSessionID { override def toString: String = { s"${timestampToString(timestamp)}\tJob execution switched to status $newJobStatus." } http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index c9a2878..e38986b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -18,6 +18,8 @@ package org.apache.flink.runtime.messages +import java.util.UUID + import org.apache.flink.api.common.JobID import org.apache.flink.runtime.client.{SerializedJobExecutionResult, JobStatusMessage} import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph} @@ -32,6 +34,8 @@ import scala.collection.JavaConverters._ */ object JobManagerMessages { + case class LeaderSessionMessage(leaderSessionID: Option[UUID], message: Any) + /** * Submits a job to the job manager. If [[registerForEvents]] is true, * then the sender will be registered as listener for the state change messages. @@ -41,6 +45,7 @@ object JobManagerMessages { * @param registerForEvents if true, then register for state change events */ case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean) + extends RequiresLeaderSessionID /** * Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is @@ -48,19 +53,22 @@ object JobManagerMessages { * * @param jobID */ - case class CancelJob(jobID: JobID) + case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID /** * Requesting next input split for the * [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]] * of the job specified by [[jobID]]. The next input split is sent back to the sender as a - * [[org.apache.flink.runtime.messages.TaskManagerMessages.NextInputSplit]] message. + * [[NextInputSplit]] message. * * @param jobID * @param vertexID */ - case class RequestNextInputSplit(jobID: JobID, vertexID: JobVertexID, executionAttempt: - ExecutionAttemptID) + case class RequestNextInputSplit( + jobID: JobID, + vertexID: JobVertexID, + executionAttempt: ExecutionAttemptID) + extends RequiresLeaderSessionID /** * Contains the next input split for a task. This message is a response to @@ -80,10 +88,12 @@ object JobManagerMessages { * @param taskExecutionId The execution attempt ID of the task requesting the partition state. * @param taskResultId The input gate ID of the task requesting the partition state. */ - case class RequestPartitionState(jobId: JobID, - partitionId: ResultPartitionID, - taskExecutionId: ExecutionAttemptID, - taskResultId: IntermediateDataSetID) + case class RequestPartitionState( + jobId: JobID, + partitionId: ResultPartitionID, + taskExecutionId: ExecutionAttemptID, + taskResultId: IntermediateDataSetID) + extends RequiresLeaderSessionID /** * Notifies the [[org.apache.flink.runtime.jobmanager.JobManager]] about available data for a @@ -101,8 +111,7 @@ object JobManagerMessages { * @see [[org.apache.flink.runtime.io.network.partition.ResultPartition]] */ case class ScheduleOrUpdateConsumers(jobId: JobID, partitionId: ResultPartitionID) - - case class ConsumerNotificationResult(success: Boolean, error: Option[Throwable] = None) + extends RequiresLeaderSessionID /** * Requests the current [[JobStatus]] of the job identified by [[jobID]]. This message triggers @@ -142,6 +151,17 @@ object JobManagerMessages { */ case object RequestBlobManagerPort + /** Requests the current leader session ID of the job manager. The result is sent back to the + * sender as an [[ResponseLeaderSessionID]] + */ + case object RequestLeaderSessionID + + /** Response to the [[RequestLeaderSessionID]] message. + * + * @param leaderSessionID + */ + case class ResponseLeaderSessionID(leaderSessionID: Option[UUID]) + /** * Denotes a successful job execution. */ @@ -300,4 +320,8 @@ object JobManagerMessages { def getJobManagerStatusAlive : AnyRef = { JobManagerStatusAlive } + + def getRequestLeaderSessionID: AnyRef = { + RequestLeaderSessionID + } } http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala index e292a47..75036b3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala @@ -39,7 +39,7 @@ object Messages { * * @param reason The reason for disconnecting, to be displayed in log and error messages. */ - case class Disconnect(reason: String) + case class Disconnect(reason: String) extends RequiresLeaderSessionID /** * Accessor for the case object instance, to simplify Java interoperability. http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala index 3051d00..b435ebc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala @@ -18,6 +18,8 @@ package org.apache.flink.runtime.messages +import java.util.UUID + import akka.actor.ActorRef import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, HardwareDescription} @@ -32,7 +34,9 @@ object RegistrationMessages { /** * Marker trait for registration messages. */ - trait RegistrationMessage + trait RegistrationMessage { + def registrationSessionID: UUID + } /** * Triggers the TaskManager to attempt a registration at the JobManager. @@ -42,10 +46,12 @@ object RegistrationMessages { * @param deadline Optional deadline until when the registration must be completed. * @param attempt The attempt number, for logging. */ - case class TriggerTaskManagerRegistration(jobManagerAkkaURL: String, - timeout: FiniteDuration, - deadline: Option[Deadline], - attempt: Int) + case class TriggerTaskManagerRegistration( + registrationSessionID: UUID, + jobManagerAkkaURL: String, + timeout: FiniteDuration, + deadline: Option[Deadline], + attempt: Int) extends RegistrationMessage /** @@ -57,10 +63,12 @@ object RegistrationMessages { * @param resources The TaskManagers resources. * @param numberOfSlots The number of processing slots offered by the TaskManager. */ - case class RegisterTaskManager(taskManager: ActorRef, - connectionInfo: InstanceConnectionInfo, - resources: HardwareDescription, - numberOfSlots: Int) + case class RegisterTaskManager( + registrationSessionID: UUID, + taskManager: ActorRef, + connectionInfo: InstanceConnectionInfo, + resources: HardwareDescription, + numberOfSlots: Int) extends RegistrationMessage /** @@ -71,7 +79,12 @@ object RegistrationMessages { * JobManager. * @param blobPort The server port where the JobManager's BLOB service runs. */ - case class AcknowledgeRegistration(jobManager: ActorRef, instanceID: InstanceID, blobPort: Int) + case class AcknowledgeRegistration( + registrationSessionID: UUID, + leaderSessionID: UUID, + jobManager: ActorRef, + instanceID: InstanceID, + blobPort: Int) extends RegistrationMessage /** @@ -80,7 +93,12 @@ object RegistrationMessages { * @param instanceID The instance ID under which the TaskManager is registered. * @param blobPort The server port where the JobManager's BLOB service runs. */ - case class AlreadyRegistered(jobManager: ActorRef, instanceID: InstanceID, blobPort: Int) + case class AlreadyRegistered( + registrationSessionID: UUID, + leaderSessionID: UUID, + jobManager: ActorRef, + instanceID: InstanceID, + blobPort: Int) extends RegistrationMessage /** @@ -89,6 +107,6 @@ object RegistrationMessages { * * @param reason Reason why the task manager registration was refused */ - case class RefuseRegistration(reason: String) + case class RefuseRegistration(registrationSessionID: UUID, reason: String) extends RegistrationMessage } http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala index 9373576..a80ca99 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala @@ -47,7 +47,7 @@ object TaskMessages { * @param tasks Descriptor which contains the information to start the task. */ case class SubmitTask(tasks: TaskDeploymentDescriptor) - extends TaskMessage + extends TaskMessage with RequiresLeaderSessionID /** * Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a @@ -56,7 +56,7 @@ object TaskMessages { * @param attemptID The task's execution attempt ID. */ case class CancelTask(attemptID: ExecutionAttemptID) - extends TaskMessage + extends TaskMessage with RequiresLeaderSessionID /** * Triggers a fail of specified task from the outside (as opposed to the task throwing @@ -86,15 +86,16 @@ object TaskMessages { * Answer to a [[RequestPartitionState]] with the state of the respective partition. */ case class PartitionState( - taskExecutionId: ExecutionAttemptID, - taskResultId: IntermediateDataSetID, - partitionId: IntermediateResultPartitionID, - state: ExecutionState) extends TaskMessage + taskExecutionId: ExecutionAttemptID, + taskResultId: IntermediateDataSetID, + partitionId: IntermediateResultPartitionID, + state: ExecutionState) + extends TaskMessage with RequiresLeaderSessionID /** * Base class for messages that update the information about location of input partitions */ - abstract sealed class UpdatePartitionInfo extends TaskMessage { + abstract sealed class UpdatePartitionInfo extends TaskMessage with RequiresLeaderSessionID { def executionID: ExecutionAttemptID } @@ -104,9 +105,10 @@ object TaskMessages { * @param resultId The input reader to update. * @param partitionInfo The partition info update. */ - case class UpdateTaskSinglePartitionInfo(executionID: ExecutionAttemptID, - resultId: IntermediateDataSetID, - partitionInfo: InputChannelDeploymentDescriptor) + case class UpdateTaskSinglePartitionInfo( + executionID: ExecutionAttemptID, + resultId: IntermediateDataSetID, + partitionInfo: InputChannelDeploymentDescriptor) extends UpdatePartitionInfo /** @@ -115,8 +117,8 @@ object TaskMessages { * @param partitionInfos List of input gates with channel descriptors to update. */ case class UpdateTaskMultiplePartitionInfos( - executionID: ExecutionAttemptID, - partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)]) + executionID: ExecutionAttemptID, + partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)]) extends UpdatePartitionInfo /** @@ -126,7 +128,7 @@ object TaskMessages { * @param executionID The task's execution attempt ID. */ case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID) - extends TaskMessage + extends TaskMessage with RequiresLeaderSessionID // -------------------------------------------------------------------------- @@ -140,7 +142,7 @@ object TaskMessages { * @param taskExecutionState The changed task state */ case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) - extends TaskMessage + extends TaskMessage with RequiresLeaderSessionID /** * Response message to updates in the task state. Send for example as a response to @@ -152,11 +154,11 @@ object TaskMessages { * @param success indicating whether the operation has been successful * @param description Optional description for unsuccessful results. */ - case class TaskOperationResult(executionID: ExecutionAttemptID, - success: Boolean, - description: String) - extends TaskMessage - { + case class TaskOperationResult( + executionID: ExecutionAttemptID, + success: Boolean, + description: String) + extends TaskMessage { def this(executionID: ExecutionAttemptID, success: Boolean) = this(executionID, success, "") } @@ -166,10 +168,10 @@ object TaskMessages { // -------------------------------------------------------------------------- def createUpdateTaskMultiplePartitionInfos( - executionID: ExecutionAttemptID, - resultIDs: java.util.List[IntermediateDataSetID], - partitionInfos: java.util.List[InputChannelDeploymentDescriptor]): - UpdateTaskMultiplePartitionInfos = { + executionID: ExecutionAttemptID, + resultIDs: java.util.List[IntermediateDataSetID], + partitionInfos: java.util.List[InputChannelDeploymentDescriptor]) + : UpdateTaskMultiplePartitionInfos = { require(resultIDs.size() == partitionInfos.size(), "ResultIDs must have the same length as partitionInfos.") http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 49c701e..6f810fc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -28,8 +28,11 @@ import org.apache.flink.api.common.JobSubmissionResult import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.client.{JobExecutionException, JobClient, SerializedJobExecutionResult} +import org.apache.flink.runtime.client.{JobExecutionException, JobClient, +SerializedJobExecutionResult} +import org.apache.flink.runtime.instance.ActorGateway import org.apache.flink.runtime.jobgraph.JobGraph +import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager import org.slf4j.LoggerFactory @@ -132,8 +135,9 @@ abstract class FlinkMiniCluster( AkkaUtils.createActorSystem(config) } - def getJobManager: ActorRef = { - jobManagerActor + def getJobManagerGateway(): ActorGateway = { + // create ActorGateway from the JobManager's ActorRef + JobManager.getJobManagerGateway(jobManagerActor, timeout) } def getTaskManagers = { @@ -206,12 +210,17 @@ abstract class FlinkMiniCluster( val clientActorSystem = if (singleActorSystem) jobManagerActorSystem else JobClient.startJobClientActorSystem(configuration) - JobClient.submitJobAndWait(clientActorSystem, jobManagerActor, jobGraph, timeout, printUpdates) + JobClient.submitJobAndWait( + clientActorSystem, + getJobManagerGateway(), + jobGraph, + timeout, + printUpdates) } @throws(classOf[JobExecutionException]) def submitJobDetached(jobGraph: JobGraph) : JobSubmissionResult = { - JobClient.submitJobDetached(jobManagerActor, jobGraph, timeout) + JobClient.submitJobDetached(getJobManagerGateway(), jobGraph, timeout) new JobSubmissionResult(jobGraph.getJobID) } }