[FLINK-2425] [runtime] Cleanup code for forwarding config and hostname into 
TaskManager's RuntimeEnvironment


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3ef61de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3ef61de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3ef61de

Branch: refs/heads/master
Commit: c3ef61de934a9c447ec442449c527ce719ee46c6
Parents: 5bf2197
Author: Stephan Ewen <[email protected]>
Authored: Mon Aug 3 17:41:57 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Aug 3 18:49:45 2015 +0200

----------------------------------------------------------------------
 .../runtime/taskmanager/RuntimeEnvironment.java | 12 ++--
 .../apache/flink/runtime/taskmanager/Task.java  |  8 +--
 .../taskmanager/TaskManagerRuntimeInfo.java     | 61 ++++++++++++++++++++
 .../taskmanager/RuntimeConfiguration.scala      | 23 --------
 .../flink/runtime/taskmanager/TaskManager.scala |  8 ++-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  6 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  8 +--
 7 files changed, 80 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index c0dfee6..cd6dbd6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -75,9 +75,9 @@ public class RuntimeEnvironment implements Environment {
 
        private final AccumulatorRegistry accumulatorRegistry;
 
-       private Configuration taskManagerConfiguration;
+       private final Configuration taskManagerConfiguration;
 
-       private String hostname;
+       private final String hostname;
 
        // 
------------------------------------------------------------------------
 
@@ -95,13 +95,13 @@ public class RuntimeEnvironment implements Environment {
                        MemoryManager memManager,
                        IOManager ioManager,
                        BroadcastVariableManager bcVarManager,
-                                                               
AccumulatorRegistry accumulatorRegistry,
+                       AccumulatorRegistry accumulatorRegistry,
                        InputSplitProvider splitProvider,
                        Map<String, Future<Path>> distCacheEntries,
                        ResultPartitionWriter[] writers,
                        InputGate[] inputGates,
                        ActorGateway jobManager,
-                       RuntimeConfiguration taskManagerConfig) {
+                       TaskManagerRuntimeInfo taskManagerInfo) {
                
                checkArgument(parallelism > 0 && subtaskIndex >= 0 && 
subtaskIndex < parallelism);
 
@@ -124,8 +124,8 @@ public class RuntimeEnvironment implements Environment {
                this.writers = checkNotNull(writers);
                this.inputGates = checkNotNull(inputGates);
                this.jobManager = checkNotNull(jobManager);
-               this.taskManagerConfiguration = 
checkNotNull(taskManagerConfig).configuration();
-               this.hostname = taskManagerConfig.hostname();
+               this.taskManagerConfiguration = 
checkNotNull(taskManagerInfo).getConfiguration();
+               this.hostname = taskManagerInfo.getHostname();
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 878a69a..36de90a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -145,6 +145,9 @@ public class Task implements Runnable {
        /** The name of the class that holds the invokable code */
        private final String nameOfInvokableClass;
 
+       /** Access to task manager configuration and host names*/
+       private final TaskManagerRuntimeInfo taskManagerConfig;
+       
        /** The memory manager to be used by this task */
        private final MemoryManager memoryManager;
 
@@ -214,9 +217,6 @@ public class Task implements Runnable {
         * initialization, to be memory friendly */
        private volatile SerializedValue<StateHandle<?>> operatorState;
 
-       /** Access to task manager configuration and host names*/
-       private RuntimeConfiguration taskManagerConfig;
-
        /**
         * <p><b>IMPORTANT:</b> This constructor may not start any work that 
would need to 
         * be undone in the case of a failing task deployment.</p>
@@ -231,7 +231,7 @@ public class Task implements Runnable {
                                FiniteDuration actorAskTimeout,
                                LibraryCacheManager libraryCache,
                                FileCache fileCache,
-                               RuntimeConfiguration taskManagerConfig)
+                               TaskManagerRuntimeInfo taskManagerConfig)
        {
                checkArgument(tdd.getNumberOfSubtasks() > 0);
                checkArgument(tdd.getIndexInSubtaskGroup() >= 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
new file mode 100644
index 0000000..8d06f10
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Encapsulation of TaskManager runtime information, like hostname and 
configuration.
+ */
+public class TaskManagerRuntimeInfo implements java.io.Serializable {
+
+       private static final long serialVersionUID = 5598219619760274072L;
+       
+       /** host name of the interface that the TaskManager uses to communicate 
*/
+       private final String hostname;
+
+       /** configuration that the TaskManager was started with */
+       private final Configuration configuration;
+
+       /**
+        * Creates a runtime info.
+        * @param hostname The host name of the interface that the TaskManager 
uses to communicate.
+        * @param configuration The configuration that the TaskManager was 
started with.
+        */
+       public TaskManagerRuntimeInfo(String hostname, Configuration 
configuration) {
+               this.hostname = hostname;
+               this.configuration = configuration;
+       }
+
+       /**
+        * Gets host name of the interface that the TaskManager uses to 
communicate.
+        * @return The host name of the interface that the TaskManager uses to 
communicate.
+        */
+       public String getHostname() {
+               return hostname;
+       }
+
+       /**
+        * Gets the configuration that the TaskManager was started with.
+        * @return The configuration that the TaskManager was started with.
+        */
+       public Configuration getConfiguration() {
+               return configuration;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
deleted file mode 100644
index ef0e705..0000000
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager
-
-import org.apache.flink.configuration.UnmodifiableConfiguration
-
-case class RuntimeConfiguration(hostname: String, configuration: 
UnmodifiableConfiguration)

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3ab271a..cc8b8ba 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -173,6 +173,10 @@ class TaskManager(
 
   private val currentRegistrationSessionID: UUID = UUID.randomUUID()
 
+  private val runtimeInfo = new TaskManagerRuntimeInfo(
+       connectionInfo.getHostname(),
+       new UnmodifiableConfiguration(config.configuration))
+
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -893,9 +897,7 @@ class TaskManager(
         config.timeout,
         libCache,
         fileCache,
-        new RuntimeConfiguration(
-          self.path.toSerializationFormat,
-          new UnmodifiableConfiguration(config.configuration)))
+        runtimeInfo)
 
       log.info(s"Received task ${task.getTaskNameWithSubtasks}")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 08f0094..a7d8d8d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -129,7 +128,6 @@ public class TaskAsyncCallTest {
        }
        
        private static Task createTask() {
-               
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
                
@@ -161,9 +159,7 @@ public class TaskAsyncCallTest {
                                new FiniteDuration(60, TimeUnit.SECONDS),
                                libCache,
                                mock(FileCache.class),
-                               new RuntimeConfiguration(
-                                               taskManagerGateway.path(),
-                                               new 
UnmodifiableConfiguration(new Configuration())));
+                               new TaskManagerRuntimeInfo("localhost", new 
Configuration()));
        }
        
        public static class CheckpointsInOrderInvokable extends 
AbstractInvokable

http://git-wip-us.apache.org/repos/asf/flink/blob/c3ef61de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 6d9df6d..0cba533 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.taskmanager;
 
 import com.google.common.collect.Maps;
+
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -49,6 +48,7 @@ import org.apache.flink.runtime.messages.TaskMessages;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.reflect.Field;
@@ -727,9 +727,7 @@ public class TaskTest {
                                new FiniteDuration(60, TimeUnit.SECONDS),
                                libCache,
                                mock(FileCache.class),
-                               new RuntimeConfiguration(
-                                               taskManagerGateway.path(),
-                                               new 
UnmodifiableConfiguration(new Configuration())));
+                               new TaskManagerRuntimeInfo("localhost", new 
Configuration()));
        }
 
        private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? 
extends AbstractInvokable> invokable) {

Reply via email to