[FLINK-2425] [runtime] Cleanup code for forwarding config and hostname into TaskManager's RuntimeEnvironment
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3ef61de Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3ef61de Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3ef61de Branch: refs/heads/master Commit: c3ef61de934a9c447ec442449c527ce719ee46c6 Parents: 5bf2197 Author: Stephan Ewen <[email protected]> Authored: Mon Aug 3 17:41:57 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Aug 3 18:49:45 2015 +0200 ---------------------------------------------------------------------- .../runtime/taskmanager/RuntimeEnvironment.java | 12 ++-- .../apache/flink/runtime/taskmanager/Task.java | 8 +-- .../taskmanager/TaskManagerRuntimeInfo.java | 61 ++++++++++++++++++++ .../taskmanager/RuntimeConfiguration.scala | 23 -------- .../flink/runtime/taskmanager/TaskManager.scala | 8 ++- .../runtime/taskmanager/TaskAsyncCallTest.java | 6 +- .../flink/runtime/taskmanager/TaskTest.java | 8 +-- 7 files changed, 80 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index c0dfee6..cd6dbd6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -75,9 +75,9 @@ public class RuntimeEnvironment implements Environment { private final AccumulatorRegistry accumulatorRegistry; - private Configuration taskManagerConfiguration; + private final Configuration taskManagerConfiguration; - private String hostname; + private final String hostname; // ------------------------------------------------------------------------ @@ -95,13 +95,13 @@ public class RuntimeEnvironment implements Environment { MemoryManager memManager, IOManager ioManager, BroadcastVariableManager bcVarManager, - AccumulatorRegistry accumulatorRegistry, + AccumulatorRegistry accumulatorRegistry, InputSplitProvider splitProvider, Map<String, Future<Path>> distCacheEntries, ResultPartitionWriter[] writers, InputGate[] inputGates, ActorGateway jobManager, - RuntimeConfiguration taskManagerConfig) { + TaskManagerRuntimeInfo taskManagerInfo) { checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism); @@ -124,8 +124,8 @@ public class RuntimeEnvironment implements Environment { this.writers = checkNotNull(writers); this.inputGates = checkNotNull(inputGates); this.jobManager = checkNotNull(jobManager); - this.taskManagerConfiguration = checkNotNull(taskManagerConfig).configuration(); - this.hostname = taskManagerConfig.hostname(); + this.taskManagerConfiguration = checkNotNull(taskManagerInfo).getConfiguration(); + this.hostname = taskManagerInfo.getHostname(); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 878a69a..36de90a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -145,6 +145,9 @@ public class Task implements Runnable { /** The name of the class that holds the invokable code */ private final String nameOfInvokableClass; + /** Access to task manager configuration and host names*/ + private final TaskManagerRuntimeInfo taskManagerConfig; + /** The memory manager to be used by this task */ private final MemoryManager memoryManager; @@ -214,9 +217,6 @@ public class Task implements Runnable { * initialization, to be memory friendly */ private volatile SerializedValue<StateHandle<?>> operatorState; - /** Access to task manager configuration and host names*/ - private RuntimeConfiguration taskManagerConfig; - /** * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to * be undone in the case of a failing task deployment.</p> @@ -231,7 +231,7 @@ public class Task implements Runnable { FiniteDuration actorAskTimeout, LibraryCacheManager libraryCache, FileCache fileCache, - RuntimeConfiguration taskManagerConfig) + TaskManagerRuntimeInfo taskManagerConfig) { checkArgument(tdd.getNumberOfSubtasks() > 0); checkArgument(tdd.getIndexInSubtaskGroup() >= 0); http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java new file mode 100644 index 0000000..8d06f10 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.configuration.Configuration; + +/** + * Encapsulation of TaskManager runtime information, like hostname and configuration. + */ +public class TaskManagerRuntimeInfo implements java.io.Serializable { + + private static final long serialVersionUID = 5598219619760274072L; + + /** host name of the interface that the TaskManager uses to communicate */ + private final String hostname; + + /** configuration that the TaskManager was started with */ + private final Configuration configuration; + + /** + * Creates a runtime info. + * @param hostname The host name of the interface that the TaskManager uses to communicate. + * @param configuration The configuration that the TaskManager was started with. + */ + public TaskManagerRuntimeInfo(String hostname, Configuration configuration) { + this.hostname = hostname; + this.configuration = configuration; + } + + /** + * Gets host name of the interface that the TaskManager uses to communicate. + * @return The host name of the interface that the TaskManager uses to communicate. + */ + public String getHostname() { + return hostname; + } + + /** + * Gets the configuration that the TaskManager was started with. + * @return The configuration that the TaskManager was started with. + */ + public Configuration getConfiguration() { + return configuration; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala deleted file mode 100644 index ef0e705..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager - -import org.apache.flink.configuration.UnmodifiableConfiguration - -case class RuntimeConfiguration(hostname: String, configuration: UnmodifiableConfiguration) http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 3ab271a..cc8b8ba 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -173,6 +173,10 @@ class TaskManager( private val currentRegistrationSessionID: UUID = UUID.randomUUID() + private val runtimeInfo = new TaskManagerRuntimeInfo( + connectionInfo.getHostname(), + new UnmodifiableConfiguration(config.configuration)) + // -------------------------------------------------------------------------- // Actor messages and life cycle // -------------------------------------------------------------------------- @@ -893,9 +897,7 @@ class TaskManager( config.timeout, libCache, fileCache, - new RuntimeConfiguration( - self.path.toSerializationFormat, - new UnmodifiableConfiguration(config.configuration))) + runtimeInfo) log.info(s"Received task ${task.getTaskNameWithSubtasks}") http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 08f0094..a7d8d8d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -129,7 +128,6 @@ public class TaskAsyncCallTest { } private static Task createTask() { - LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader()); @@ -161,9 +159,7 @@ public class TaskAsyncCallTest { new FiniteDuration(60, TimeUnit.SECONDS), libCache, mock(FileCache.class), - new RuntimeConfiguration( - taskManagerGateway.path(), - new UnmodifiableConfiguration(new Configuration()))); + new TaskManagerRuntimeInfo("localhost", new Configuration())); } public static class CheckpointsInOrderInvokable extends AbstractInvokable http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 6d9df6d..0cba533 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -19,9 +19,8 @@ package org.apache.flink.runtime.taskmanager; import com.google.common.collect.Maps; + import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.UnmodifiableConfiguration; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -49,6 +48,7 @@ import org.apache.flink.runtime.messages.TaskMessages; import org.junit.After; import org.junit.Before; import org.junit.Test; + import scala.concurrent.duration.FiniteDuration; import java.lang.reflect.Field; @@ -727,9 +727,7 @@ public class TaskTest { new FiniteDuration(60, TimeUnit.SECONDS), libCache, mock(FileCache.class), - new RuntimeConfiguration( - taskManagerGateway.path(), - new UnmodifiableConfiguration(new Configuration()))); + new TaskManagerRuntimeInfo("localhost", new Configuration())); } private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {
