[FLINK-4746] Make TaskManagerRuntimeInfo an interface Let the TaskManagerConfiguration implement the TaskManagerRuntimeInformation to make some of the TaskManager's configuration values accessible from different components.
This closes #2599. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4596f1e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4596f1e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4596f1e Branch: refs/heads/flip-6 Commit: a4596f1e21157066a3728f4c13209d41a3cd6858 Parents: 5275586 Author: Till Rohrmann <[email protected]> Authored: Wed Oct 5 14:47:24 2016 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Oct 20 19:48:30 2016 +0200 ---------------------------------------------------------------------- .../runtime/taskexecutor/TaskExecutor.java | 11 +--- .../taskexecutor/TaskManagerConfiguration.java | 22 +++---- .../taskmanager/TaskManagerRuntimeInfo.java | 61 ++------------------ .../flink/runtime/taskmanager/TaskManager.scala | 11 +--- .../operators/drivers/TestTaskContext.java | 4 +- .../testutils/BinaryOperatorTestBase.java | 4 +- .../operators/testutils/DriverTestBase.java | 4 +- .../operators/testutils/MockEnvironment.java | 8 +-- .../testutils/UnaryOperatorTestBase.java | 4 +- .../runtime/taskexecutor/TaskExecutorTest.java | 8 +-- .../runtime/taskmanager/TaskAsyncCallTest.java | 4 +- .../flink/runtime/taskmanager/TaskTest.java | 3 +- .../util/TestingTaskManagerRuntimeInfo.java | 52 +++++++++++++++++ .../tasks/InterruptSensitiveRestoreTest.java | 5 +- .../runtime/tasks/StreamMockEnvironment.java | 4 +- .../streaming/runtime/tasks/StreamTaskTest.java | 4 +- 16 files changed, 98 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 35b639b..a2716e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.taskexecutor; -import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -71,7 +70,6 @@ import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.util.Preconditions; import java.util.HashSet; @@ -127,9 +125,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { private final FileCache fileCache; - // TODO: Try to get rid of it - private final TaskManagerRuntimeInfo taskManagerRuntimeInfo; - // --------- resource manager -------- private TaskExecutorToResourceManagerConnection resourceManagerConnection; @@ -177,10 +172,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); this.broadcastVariableManager = checkNotNull(broadcastVariableManager); this.fileCache = checkNotNull(fileCache); - this.taskManagerRuntimeInfo = new TaskManagerRuntimeInfo( - taskManagerLocation.getHostname(), - new UnmodifiableConfiguration(taskManagerConfiguration.getConfiguration()), - taskManagerConfiguration.getTmpDirPaths()); this.jobManagerConnections = new HashMap<>(4); @@ -308,7 +299,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { checkpointResponder, libraryCache, fileCache, - taskManagerRuntimeInfo, + taskManagerConfiguration, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index bce3dc3..1d1e732 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,13 +34,13 @@ import java.io.File; /** * Configuration object for {@link TaskExecutor}. */ -public class TaskManagerConfiguration { +public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class); private final int numberSlots; - private final String[] tmpDirPaths; + private final String[] tmpDirectories; private final Time timeout; // null indicates an infinite duration @@ -50,12 +51,11 @@ public class TaskManagerConfiguration { private final long cleanupInterval; - // TODO: remove necessity for complete configuration object - private final Configuration configuration; + private final UnmodifiableConfiguration configuration; public TaskManagerConfiguration( int numberSlots, - String[] tmpDirPaths, + String[] tmpDirectories, Time timeout, Time maxRegistrationDuration, Time initialRegistrationPause, @@ -65,7 +65,7 @@ public class TaskManagerConfiguration { Configuration configuration) { this.numberSlots = numberSlots; - this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths); + this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories); this.timeout = Preconditions.checkNotNull(timeout); this.maxRegistrationDuration = maxRegistrationDuration; this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause); @@ -79,10 +79,6 @@ public class TaskManagerConfiguration { return numberSlots; } - public String[] getTmpDirPaths() { - return tmpDirPaths; - } - public Time getTimeout() { return timeout; } @@ -107,10 +103,16 @@ public class TaskManagerConfiguration { return cleanupInterval; } + @Override public Configuration getConfiguration() { return configuration; } + @Override + public String[] getTmpDirectories() { + return tmpDirectories; + } + // -------------------------------------------------------------------------------------------- // Static factory methods // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java index 9ac982e..d1efe34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java @@ -20,71 +20,22 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.configuration.Configuration; -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkArgument; - /** - * Encapsulation of TaskManager runtime information, like hostname and configuration. + * Interface to access {@link TaskManager} information. */ -public class TaskManagerRuntimeInfo implements java.io.Serializable { - - private static final long serialVersionUID = 5598219619760274072L; - - /** host name of the interface that the TaskManager uses to communicate */ - private final String hostname; - - /** configuration that the TaskManager was started with */ - private final Configuration configuration; - - /** list of temporary file directories */ - private final String[] tmpDirectories; - - /** - * Creates a runtime info. - * - * @param hostname The host name of the interface that the TaskManager uses to communicate. - * @param configuration The configuration that the TaskManager was started with. - * @param tmpDirectory The temporary file directory. - */ - public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String tmpDirectory) { - this(hostname, configuration, new String[] { tmpDirectory }); - } - - /** - * Creates a runtime info. - * @param hostname The host name of the interface that the TaskManager uses to communicate. - * @param configuration The configuration that the TaskManager was started with. - * @param tmpDirectories The list of temporary file directories. - */ - public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String[] tmpDirectories) { - checkArgument(tmpDirectories.length > 0); - this.hostname = checkNotNull(hostname); - this.configuration = checkNotNull(configuration); - this.tmpDirectories = tmpDirectories; - - } - - /** - * Gets host name of the interface that the TaskManager uses to communicate. - * @return The host name of the interface that the TaskManager uses to communicate. - */ - public String getHostname() { - return hostname; - } +public interface TaskManagerRuntimeInfo { /** * Gets the configuration that the TaskManager was started with. + * * @return The configuration that the TaskManager was started with. */ - public Configuration getConfiguration() { - return configuration; - } + Configuration getConfiguration(); /** * Gets the list of temporary file directories. + * * @return The list of temporary file directories. */ - public String[] getTmpDirectories() { - return tmpDirectories; - } + String[] getTmpDirectories(); } http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 7364ee0..1b56c92 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -150,7 +150,7 @@ class TaskManager( protected val bcVarManager = new BroadcastVariableManager() /** Handler for distributed files cached by this TaskManager */ - protected val fileCache = new FileCache(config.getTmpDirPaths()) + protected val fileCache = new FileCache(config.getTmpDirectories()) /** Registry of metrics periodically transmitted to the JobManager */ private val metricRegistry = TaskManager.createMetricsRegistry() @@ -184,11 +184,6 @@ class TaskManager( var leaderSessionID: Option[UUID] = None - private val runtimeInfo = new TaskManagerRuntimeInfo( - location.getHostname(), - new UnmodifiableConfiguration(config.getConfiguration()), - config.getTmpDirPaths()) - private var scheduledTaskManagerRegistration: Option[Cancellable] = None private var currentRegistrationRun: UUID = UUID.randomUUID() @@ -996,7 +991,7 @@ class TaskManager( } taskManagerMetricGroup = - new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString) + new TaskManagerMetricGroup(metricsRegistry, location.getHostname, id.toString) TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network) @@ -1180,7 +1175,7 @@ class TaskManager( checkpointResponder, libCache, fileCache, - runtimeInfo, + config, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java index 62110a7..d34bb40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -74,8 +75,7 @@ public class TestTaskContext<S, T> implements TaskContext<S, T> { public TestTaskContext(long memoryInBytes) { this.memoryManager = new MemoryManager(memoryInBytes, 1, 32 * 1024, MemoryType.HEAP, true); - this.taskManageInfo = new TaskManagerRuntimeInfo( - "localhost", new Configuration(), System.getProperty("java.io.tmpdir")); + this.taskManageInfo = new TestingTaskManagerRuntimeInfo(); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index 75f960e..3d4c45f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.operators.TaskContext; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.apache.flink.util.TestLogger; @@ -110,8 +111,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog this.owner = new DummyInvokable(); this.taskConfig = new TaskConfig(new Configuration()); this.executionConfig = executionConfig; - this.taskManageInfo = new TaskManagerRuntimeInfo( - "localhost", new Configuration(), System.getProperty("java.io.tmpdir")); + this.taskManageInfo = new TestingTaskManagerRuntimeInfo(); } @Parameterized.Parameters http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index 088435a..f43632c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.testutils.recordutils.RecordComparator; import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -115,8 +116,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Ta this.owner = new DummyInvokable(); this.taskConfig = new TaskConfig(new Configuration()); this.executionConfig = executionConfig; - this.taskManageInfo = new TaskManagerRuntimeInfo( - "localhost", new Configuration(), System.getProperty("java.io.tmpdir")); + this.taskManageInfo = new TestingTaskManagerRuntimeInfo(); } @Parameterized.Parameters http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 08b84cb..9b33071 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; @@ -48,8 +47,10 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.types.Record; import org.apache.flink.util.MutableObjectIterator; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -233,10 +234,7 @@ public class MockEnvironment implements Environment { @Override public TaskManagerRuntimeInfo getTaskManagerInfo() { - return new TaskManagerRuntimeInfo( - "localhost", - new UnmodifiableConfiguration(new Configuration()), - System.getProperty("java.io.tmpdir")); + return new TestingTaskManagerRuntimeInfo(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java index a94e694..85137cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.operators.ResettableDriver; import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -115,8 +116,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg this.executionConfig = executionConfig; this.comparators = new ArrayList<TypeComparator<IN>>(2); - this.taskManageInfo = new TaskManagerRuntimeInfo( - "localhost", new Configuration(), System.getProperty("java.io.tmpdir")); + this.taskManageInfo = new TestingTaskManagerRuntimeInfo(); } @Parameterized.Parameters http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index f5fe52c..ecbd9b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -63,14 +63,11 @@ public class TaskExecutorTest extends TestLogger { ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); - PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration()); - PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]); rpc.registerGateway(resourceManagerAddress, rmGateway); TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); when(taskManagerLocation.getResourceID()).thenReturn(resourceID); - when(taskManagerLocation.getHostname()).thenReturn("foobar"); NonHaServices haServices = new NonHaServices(resourceManagerAddress); @@ -124,7 +121,7 @@ public class TaskExecutorTest extends TestLogger { TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration()); - PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]); + PowerMockito.when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]); TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); when(taskManagerLocation.getResourceID()).thenReturn(resourceID); @@ -198,12 +195,9 @@ public class TaskExecutorTest extends TestLogger { TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); - PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration()); - PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]); TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); when(taskManagerLocation.getResourceID()).thenReturn(resourceID); - when(taskManagerLocation.getHostname()).thenReturn("foobar"); TaskExecutor taskManager = new TaskExecutor( taskManagerServicesConfiguration, http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 3a87d86..8fa7463 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -49,7 +49,9 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.util.SerializedValue; + import org.junit.Before; import org.junit.Test; @@ -179,7 +181,7 @@ public class TaskAsyncCallTest { mock(CheckpointResponder.class), libCache, mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), + new TestingTaskManagerRuntimeInfo(), mock(TaskMetricGroup.class), consumableNotifier, partitionStateChecker, http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index fe618ff..50fc181 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.TaskMessages; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.util.SerializedValue; import org.junit.After; import org.junit.Before; @@ -648,7 +649,7 @@ public class TaskTest { checkpointResponder, libCache, mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), + new TestingTaskManagerRuntimeInfo(), mock(TaskMetricGroup.class), consumableNotifier, partitionStateChecker, http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java new file mode 100644 index 0000000..e56da97 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingTaskManagerRuntimeInfo.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; + +import java.io.File; + +/** + * TaskManagerRuntimeInfo implementation for testing purposes + */ +public class TestingTaskManagerRuntimeInfo implements TaskManagerRuntimeInfo { + + private final Configuration configuration; + private final String[] tmpDirectories; + + public TestingTaskManagerRuntimeInfo() { + this(new Configuration(), System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator)); + } + + public TestingTaskManagerRuntimeInfo(Configuration configuration, String[] tmpDirectories) { + this.configuration = configuration; + this.tmpDirectories = tmpDirectories; + } + + @Override + public Configuration getConfiguration() { + return configuration; + } + + @Override + public String[] getTmpDirectories() { + return tmpDirectories; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 861665f..1077052 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -49,8 +49,8 @@ import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManagerActions; -import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -175,8 +175,7 @@ public class InterruptSensitiveRestoreTest { mock(CheckpointResponder.class), new FallbackLibraryCacheManager(), new FileCache(tmpDirectories), - new TaskManagerRuntimeInfo( - "localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), + new TestingTaskManagerRuntimeInfo(new Configuration(), tmpDirectories), new UnregisteredTaskMetricsGroup(), mock(ResultPartitionConsumableNotifier.class), mock(PartitionStateChecker.class), http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 36ecf59..a73a1e4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -52,6 +52,8 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -327,7 +329,7 @@ public class StreamMockEnvironment implements Environment { @Override public TaskManagerRuntimeInfo getTaskManagerInfo() { - return new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")); + return new TestingTaskManagerRuntimeInfo(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a4596f1e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index ee0839f..bb246f9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -52,7 +52,7 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener; import org.apache.flink.runtime.taskmanager.TaskManagerActions; -import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -243,7 +243,7 @@ public class StreamTaskTest { mock(CheckpointResponder.class), libCache, mock(FileCache.class), - new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")), + new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}), new UnregisteredTaskMetricsGroup(), consumableNotifier, partitionStateChecker,
