Repository: flink
Updated Branches:
  refs/heads/flip-6 8c656d925 -> 64ee13862


[FLINK-4746] Make TaskManagerRuntimeInfo an interface

Let the TaskManagerConfiguration implement the TaskManagerRuntimeInformation to 
make some of
the TaskManager's configuration values accessible from different components.

This closes #2599.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64ee1386
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64ee1386
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64ee1386

Branch: refs/heads/flip-6
Commit: 64ee1386294cde6c61c8ee15c5e1d1ad018dcc46
Parents: 8c656d9
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Oct 5 14:47:24 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Oct 12 11:04:34 2016 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      | 11 +---
 .../taskexecutor/TaskManagerConfiguration.java  | 22 +++----
 .../taskmanager/TaskManagerRuntimeInfo.java     | 61 ++------------------
 .../flink/runtime/taskmanager/TaskManager.scala | 11 +---
 .../operators/drivers/TestTaskContext.java      |  4 +-
 .../testutils/BinaryOperatorTestBase.java       |  4 +-
 .../operators/testutils/DriverTestBase.java     |  4 +-
 .../operators/testutils/MockEnvironment.java    | 10 +---
 .../testutils/UnaryOperatorTestBase.java        |  4 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  8 +--
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  3 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  3 +-
 .../util/TestingTaskManagerRuntimeInfo.java     | 52 +++++++++++++++++
 .../tasks/InterruptSensitiveRestoreTest.java    |  5 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  6 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  4 +-
 16 files changed, 95 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 35b639b..a2716e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -71,7 +70,6 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Preconditions;
 
 import java.util.HashSet;
@@ -127,9 +125,6 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
        private final FileCache fileCache;
 
-       // TODO: Try to get rid of it
-       private final TaskManagerRuntimeInfo taskManagerRuntimeInfo;
-
        // --------- resource manager --------
 
        private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
@@ -177,10 +172,6 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                this.taskManagerMetricGroup = 
checkNotNull(taskManagerMetricGroup);
                this.broadcastVariableManager = 
checkNotNull(broadcastVariableManager);
                this.fileCache = checkNotNull(fileCache);
-               this.taskManagerRuntimeInfo = new TaskManagerRuntimeInfo(
-                       taskManagerLocation.getHostname(),
-                       new 
UnmodifiableConfiguration(taskManagerConfiguration.getConfiguration()),
-                       taskManagerConfiguration.getTmpDirPaths());
 
                this.jobManagerConnections = new HashMap<>(4);
 
@@ -308,7 +299,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        checkpointResponder,
                        libraryCache,
                        fileCache,
-                       taskManagerRuntimeInfo,
+                       taskManagerConfiguration,
                        taskMetricGroup,
                        resultPartitionConsumableNotifier,
                        partitionStateChecker,

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index bce3dc3..1d1e732 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,13 +34,13 @@ import java.io.File;
 /**
  * Configuration object for {@link TaskExecutor}.
  */
-public class TaskManagerConfiguration {
+public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerConfiguration.class);
 
        private final int numberSlots;
 
-       private final String[] tmpDirPaths;
+       private final String[] tmpDirectories;
 
        private final Time timeout;
        // null indicates an infinite duration
@@ -50,12 +51,11 @@ public class TaskManagerConfiguration {
 
        private final long cleanupInterval;
 
-       // TODO: remove necessity for complete configuration object
-       private final Configuration configuration;
+       private final UnmodifiableConfiguration configuration;
 
        public TaskManagerConfiguration(
                int numberSlots,
-               String[] tmpDirPaths,
+               String[] tmpDirectories,
                Time timeout,
                Time maxRegistrationDuration,
                Time initialRegistrationPause,
@@ -65,7 +65,7 @@ public class TaskManagerConfiguration {
                Configuration configuration) {
 
                this.numberSlots = numberSlots;
-               this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths);
+               this.tmpDirectories = 
Preconditions.checkNotNull(tmpDirectories);
                this.timeout = Preconditions.checkNotNull(timeout);
                this.maxRegistrationDuration = maxRegistrationDuration;
                this.initialRegistrationPause = 
Preconditions.checkNotNull(initialRegistrationPause);
@@ -79,10 +79,6 @@ public class TaskManagerConfiguration {
                return numberSlots;
        }
 
-       public String[] getTmpDirPaths() {
-               return tmpDirPaths;
-       }
-
        public Time getTimeout() {
                return timeout;
        }
@@ -107,10 +103,16 @@ public class TaskManagerConfiguration {
                return cleanupInterval;
        }
 
+       @Override
        public Configuration getConfiguration() {
                return configuration;
        }
 
+       @Override
+       public String[] getTmpDirectories() {
+               return tmpDirectories;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Static factory methods
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
index 9ac982e..d1efe34 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -20,71 +20,22 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.configuration.Configuration;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
- * Encapsulation of TaskManager runtime information, like hostname and 
configuration.
+ * Interface to access {@link TaskManager} information.
  */
-public class TaskManagerRuntimeInfo implements java.io.Serializable {
-
-       private static final long serialVersionUID = 5598219619760274072L;
-       
-       /** host name of the interface that the TaskManager uses to communicate 
*/
-       private final String hostname;
-
-       /** configuration that the TaskManager was started with */
-       private final Configuration configuration;
-
-       /** list of temporary file directories */
-       private final String[] tmpDirectories;
-       
-       /**
-        * Creates a runtime info.
-        * 
-        * @param hostname The host name of the interface that the TaskManager 
uses to communicate.
-        * @param configuration The configuration that the TaskManager was 
started with.
-        * @param tmpDirectory The temporary file directory.   
-        */
-       public TaskManagerRuntimeInfo(String hostname, Configuration 
configuration, String tmpDirectory) {
-               this(hostname, configuration, new String[] { tmpDirectory });
-       }
-       
-       /**
-        * Creates a runtime info.
-        * @param hostname The host name of the interface that the TaskManager 
uses to communicate.
-        * @param configuration The configuration that the TaskManager was 
started with.
-        * @param tmpDirectories The list of temporary file directories.   
-        */
-       public TaskManagerRuntimeInfo(String hostname, Configuration 
configuration, String[] tmpDirectories) {
-               checkArgument(tmpDirectories.length > 0);
-               this.hostname = checkNotNull(hostname);
-               this.configuration = checkNotNull(configuration);
-               this.tmpDirectories = tmpDirectories;
-               
-       }
-
-       /**
-        * Gets host name of the interface that the TaskManager uses to 
communicate.
-        * @return The host name of the interface that the TaskManager uses to 
communicate.
-        */
-       public String getHostname() {
-               return hostname;
-       }
+public interface TaskManagerRuntimeInfo {
 
        /**
         * Gets the configuration that the TaskManager was started with.
+        *
         * @return The configuration that the TaskManager was started with.
         */
-       public Configuration getConfiguration() {
-               return configuration;
-       }
+       Configuration getConfiguration();
 
        /**
         * Gets the list of temporary file directories.
+        * 
         * @return The list of temporary file directories.
         */
-       public String[] getTmpDirectories() {
-               return tmpDirectories;
-       }
+       String[] getTmpDirectories();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index da8c14e..26e13ba 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -149,7 +149,7 @@ class TaskManager(
   protected val bcVarManager = new BroadcastVariableManager()
 
   /** Handler for distributed files cached by this TaskManager */
-  protected val fileCache = new FileCache(config.getTmpDirPaths())
+  protected val fileCache = new FileCache(config.getTmpDirectories())
 
   /** Registry of metrics periodically transmitted to the JobManager */
   private val metricRegistry = TaskManager.createMetricsRegistry()
@@ -183,11 +183,6 @@ class TaskManager(
 
   var leaderSessionID: Option[UUID] = None
 
-  private val runtimeInfo = new TaskManagerRuntimeInfo(
-       location.getHostname(),
-       new UnmodifiableConfiguration(config.getConfiguration()),
-       config.getTmpDirPaths())
-
   private var scheduledTaskManagerRegistration: Option[Cancellable] = None
   private var currentRegistrationRun: UUID = UUID.randomUUID()
 
@@ -995,7 +990,7 @@ class TaskManager(
     }
     
     taskManagerMetricGroup = 
-      new TaskManagerMetricGroup(metricsRegistry, 
this.runtimeInfo.getHostname, id.toString)
+      new TaskManagerMetricGroup(metricsRegistry, location.getHostname, 
id.toString)
     
     
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
network)
     
@@ -1179,7 +1174,7 @@ class TaskManager(
         checkpointResponder,
         libCache,
         fileCache,
-        runtimeInfo,
+        config,
         taskMetricGroup,
         resultPartitionConsumableNotifier,
         partitionStateChecker,

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 62110a7..d34bb40 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -74,8 +75,7 @@ public class TestTaskContext<S, T> implements TaskContext<S, 
T> {
        
        public TestTaskContext(long memoryInBytes) {
                this.memoryManager = new MemoryManager(memoryInBytes, 1, 32 * 
1024, MemoryType.HEAP, true);
-               this.taskManageInfo = new TaskManagerRuntimeInfo(
-                               "localhost", new Configuration(), 
System.getProperty("java.io.tmpdir"));
+               this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 75f960e..3d4c45f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.TestLogger;
@@ -110,8 +111,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLog
                this.owner = new DummyInvokable();
                this.taskConfig = new TaskConfig(new Configuration());
                this.executionConfig = executionConfig;
-               this.taskManageInfo = new TaskManagerRuntimeInfo(
-                               "localhost", new Configuration(), 
System.getProperty("java.io.tmpdir"));
+               this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
        }
        
        @Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 088435a..f43632c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
 import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
@@ -115,8 +116,7 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Ta
                this.owner = new DummyInvokable();
                this.taskConfig = new TaskConfig(new Configuration());
                this.executionConfig = executionConfig;
-               this.taskManageInfo = new TaskManagerRuntimeInfo(
-                               "localhost", new Configuration(), 
System.getProperty("java.io.tmpdir"));
+               this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
        }
 
        @Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index c3ed6c0..d2d4094 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -45,12 +44,10 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.mockito.invocation.InvocationOnMock;
@@ -236,10 +233,7 @@ public class MockEnvironment implements Environment {
 
        @Override
        public TaskManagerRuntimeInfo getTaskManagerInfo() {
-               return new TaskManagerRuntimeInfo(
-                               "localhost",
-                               new UnmodifiableConfiguration(new 
Configuration()),
-                               System.getProperty("java.io.tmpdir"));
+               return new TestingTaskManagerRuntimeInfo();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index a94e694..85137cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -115,8 +116,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLogg
                this.executionConfig = executionConfig;
                this.comparators = new ArrayList<TypeComparator<IN>>(2);
 
-               this.taskManageInfo = new TaskManagerRuntimeInfo(
-                               "localhost", new Configuration(), 
System.getProperty("java.io.tmpdir"));
+               this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
        }
 
        @Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index f5fe52c..ecbd9b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -63,14 +63,11 @@ public class TaskExecutorTest extends TestLogger {
                        ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
                        TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
                        
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
-                       
PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new
 Configuration());
-                       
PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new
 String[1]);
 
                        rpc.registerGateway(resourceManagerAddress, rmGateway);
 
                        TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
                        
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
-                       
when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
                        NonHaServices haServices = new 
NonHaServices(resourceManagerAddress);
 
@@ -124,7 +121,7 @@ public class TaskExecutorTest extends TestLogger {
                        TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
                        
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
                        
PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new
 Configuration());
-                       
PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new
 String[1]);
+                       
PowerMockito.when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new
 String[1]);
 
                        TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
                        
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
@@ -198,12 +195,9 @@ public class TaskExecutorTest extends TestLogger {
 
                        TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
                        
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
-                       
PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new
 Configuration());
-                       
PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new
 String[1]);
 
                        TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
                        
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
-                       
when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
                        TaskExecutor taskManager = new TaskExecutor(
                                taskManagerServicesConfiguration,

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index aea5294..090880f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Before;
 import org.junit.Test;
@@ -179,7 +180,7 @@ public class TaskAsyncCallTest {
                        mock(CheckpointResponder.class),
                        libCache,
                        mock(FileCache.class),
-                       new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
+                       new TestingTaskManagerRuntimeInfo(),
                        mock(TaskMetricGroup.class),
                        consumableNotifier,
                        partitionStateChecker,

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index fe618ff..50fc181 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
 import org.junit.After;
 import org.junit.Before;
@@ -648,7 +649,7 @@ public class TaskTest {
                        checkpointResponder,
                        libCache,
                        mock(FileCache.class),
-                       new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
+                       new TestingTaskManagerRuntimeInfo(),
                        mock(TaskMetricGroup.class),
                        consumableNotifier,
                        partitionStateChecker,

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
new file mode 100644
index 0000000..e56da97
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+
+import java.io.File;
+
+/**
+ * TaskManagerRuntimeInfo implementation for testing purposes
+ */
+public class TestingTaskManagerRuntimeInfo implements TaskManagerRuntimeInfo {
+
+       private final Configuration configuration;
+       private final String[] tmpDirectories;
+
+       public TestingTaskManagerRuntimeInfo() {
+               this(new Configuration(), 
System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator));
+       }
+
+       public TestingTaskManagerRuntimeInfo(Configuration configuration, 
String[] tmpDirectories) {
+               this.configuration = configuration;
+               this.tmpDirectories = tmpDirectories;
+       }
+
+       @Override
+       public Configuration getConfiguration() {
+               return configuration;
+       }
+
+       @Override
+       public String[] getTmpDirectories() {
+               return tmpDirectories;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index ffda126..fb1b3b3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -48,8 +48,8 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -167,8 +167,7 @@ public class InterruptSensitiveRestoreTest {
                                mock(CheckpointResponder.class),
                                new FallbackLibraryCacheManager(),
                                new FileCache(tmpDirectories),
-                               new TaskManagerRuntimeInfo(
-                                               "localhost", new 
Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
+                               new TestingTaskManagerRuntimeInfo(new 
Configuration(), tmpDirectories),
                                new UnregisteredTaskMetricsGroup(),
                                mock(ResultPartitionConsumableNotifier.class),
                                mock(PartitionStateChecker.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 9b773d8..6f9d8dd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -49,12 +49,10 @@ import 
org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -336,7 +334,7 @@ public class StreamMockEnvironment implements Environment {
 
        @Override
        public TaskManagerRuntimeInfo getTaskManagerInfo() {
-               return new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir"));
+               return new TestingTaskManagerRuntimeInfo();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/64ee1386/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 5a8ca04..205fba0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -51,7 +51,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -221,7 +221,7 @@ public class StreamTaskTest {
                        mock(CheckpointResponder.class),
                        libCache,
                        mock(FileCache.class),
-                       new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
+                       new TestingTaskManagerRuntimeInfo(),
                        mock(TaskMetricGroup.class),
                        consumableNotifier,
                        partitionStateChecker,

Reply via email to