[FLINK-2425] [FLINK-2426] [runtime] Add an unmodifiable config and provide access to task manager configuration and hostname inside RuntimeEnvironment
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8eb9cbf8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8eb9cbf8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8eb9cbf8 Branch: refs/heads/master Commit: 8eb9cbf88111b0375860a3965e093a736455db79 Parents: 24dee42 Author: Sachin Goel <[email protected]> Authored: Wed Jul 29 18:51:58 2015 +0530 Committer: Stephan Ewen <[email protected]> Committed: Mon Aug 3 18:48:07 2015 +0200 ---------------------------------------------------------------------- .../flink/configuration/Configuration.java | 2 +- .../UnmodifiableConfiguration.java | 100 +++++++++++++++++++ .../UnmodifiableConfigurationTest.java | 46 +++++++++ .../flink/runtime/execution/Environment.java | 10 ++ .../runtime/taskmanager/RuntimeEnvironment.java | 22 +++- .../apache/flink/runtime/taskmanager/Task.java | 9 +- .../taskmanager/RuntimeConfiguration.scala | 23 +++++ .../flink/runtime/taskmanager/TaskManager.scala | 7 +- .../operators/testutils/MockEnvironment.java | 11 ++ .../runtime/taskmanager/TaskAsyncCallTest.java | 10 +- .../flink/runtime/taskmanager/TaskTest.java | 6 +- .../runtime/tasks/StreamMockEnvironment.java | 11 ++ 12 files changed, 246 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index c095b5f..e9d7621 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -472,7 +472,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters implement // -------------------------------------------------------------------------------------------- - private <T> void setValueInternal(String key, T value) { + <T> void setValueInternal(String key, T value) { if (key == null) { throw new NullPointerException("Key must not be null."); } http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java new file mode 100644 index 0000000..b436a53 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unmodifiable version of the Configuration class + */ +public class UnmodifiableConfiguration extends Configuration { + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(UnmodifiableConfiguration.class); + + public UnmodifiableConfiguration(Configuration config) { + super(); + super.addAll(config); + } + + // -------------------------------------------------------------------------------------------- + // All setter methods must fail. + // -------------------------------------------------------------------------------------------- + + @Override + public final void setClass(String key, Class<?> klazz) { + error(); + } + + @Override + public final void setString(String key, String value) { + error(); + } + + @Override + public final void setInteger(String key, int value) { + error(); + } + + @Override + public final void setLong(String key, long value) { + error(); + } + + @Override + public final void setBoolean(String key, boolean value) { + error(); + } + + @Override + public final void setFloat(String key, float value) { + error(); + } + + @Override + public final void setDouble(String key, double value) { + error(); + } + + @Override + public final void setBytes(String key, byte[] bytes) { + error(); + } + + @Override + public final void addAll(Configuration other) { + error(); + } + + @Override + public final void addAll(Configuration other, String prefix) { + error(); + } + + @Override + <T> void setValueInternal(String key, T value){ + error(); + } + + private final void error(){ + throw new UnsupportedOperationException("The unmodifiable configuration object doesn't allow set methods."); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java new file mode 100644 index 0000000..302b72b --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import java.lang.reflect.Method; + +/** + * This class verifies that the Unmodifiable Configuration class overrides all setter methods in + * Configuration. + */ +public class UnmodifiableConfigurationTest { + + private static Configuration pc = new Configuration(); + private static UnmodifiableConfiguration unConf = new UnmodifiableConfiguration(pc); + private static Class clazz = unConf.getClass(); + + @Test + public void testOverride() throws Exception{ + for(Method m : clazz.getMethods()){ + if(m.getName().indexOf("set") == 0 || m.getName().indexOf("add") == 0 ) { + assertEquals(clazz, m.getDeclaringClass()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index c561869..af29560 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -72,6 +72,16 @@ public interface Environment { Configuration getTaskConfiguration(); /** + * @return The task manager configuration + */ + Configuration getTaskManagerConfiguration(); + + /** + * @return Hostname of the task manager + */ + String getHostname(); + + /** * Returns the job-wide configuration object that was attached to the JobGraph. * * @return The job-wide configuration http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index 5e276bf..c0dfee6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -75,6 +75,10 @@ public class RuntimeEnvironment implements Environment { private final AccumulatorRegistry accumulatorRegistry; + private Configuration taskManagerConfiguration; + + private String hostname; + // ------------------------------------------------------------------------ public RuntimeEnvironment( @@ -96,7 +100,8 @@ public class RuntimeEnvironment implements Environment { Map<String, Future<Path>> distCacheEntries, ResultPartitionWriter[] writers, InputGate[] inputGates, - ActorGateway jobManager) { + ActorGateway jobManager, + RuntimeConfiguration taskManagerConfig) { checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism); @@ -119,9 +124,10 @@ public class RuntimeEnvironment implements Environment { this.writers = checkNotNull(writers); this.inputGates = checkNotNull(inputGates); this.jobManager = checkNotNull(jobManager); + this.taskManagerConfiguration = checkNotNull(taskManagerConfig).configuration(); + this.hostname = taskManagerConfig.hostname(); } - // ------------------------------------------------------------------------ @Override @@ -168,7 +174,17 @@ public class RuntimeEnvironment implements Environment { public Configuration getTaskConfiguration() { return taskConfiguration; } - + + @Override + public Configuration getTaskManagerConfiguration(){ + return taskManagerConfiguration; + } + + @Override + public String getHostname(){ + return hostname; + } + @Override public ClassLoader getUserClassLoader() { return userCodeClassLoader; http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index c4f62fb..878a69a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -214,6 +214,9 @@ public class Task implements Runnable { * initialization, to be memory friendly */ private volatile SerializedValue<StateHandle<?>> operatorState; + /** Access to task manager configuration and host names*/ + private RuntimeConfiguration taskManagerConfig; + /** * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to * be undone in the case of a failing task deployment.</p> @@ -227,7 +230,8 @@ public class Task implements Runnable { ActorGateway jobManagerActor, FiniteDuration actorAskTimeout, LibraryCacheManager libraryCache, - FileCache fileCache) + FileCache fileCache, + RuntimeConfiguration taskManagerConfig) { checkArgument(tdd.getNumberOfSubtasks() > 0); checkArgument(tdd.getIndexInSubtaskGroup() >= 0); @@ -258,6 +262,7 @@ public class Task implements Runnable { this.libraryCache = checkNotNull(libraryCache); this.fileCache = checkNotNull(fileCache); this.network = checkNotNull(networkEnvironment); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>(); @@ -510,7 +515,7 @@ public class Task implements Runnable { userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, accumulatorRegistry, splitProvider, distributedCacheEntries, - writers, inputGates, jobManager); + writers, inputGates, jobManager, taskManagerConfig); // let the task code create its readers and writers invokable.setEnvironment(env); http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala new file mode 100644 index 0000000..ef0e705 --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager + +import org.apache.flink.configuration.UnmodifiableConfiguration + +case class RuntimeConfiguration(hostname: String, configuration: UnmodifiableConfiguration) http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 0ec1040..3ab271a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -36,7 +36,7 @@ import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet} import com.fasterxml.jackson.databind.ObjectMapper import grizzled.slf4j.Logger -import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException} +import org.apache.flink.configuration._ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage} @@ -892,7 +892,10 @@ class TaskManager( jobManagerGateway, config.timeout, libCache, - fileCache) + fileCache, + new RuntimeConfiguration( + self.path.toSerializationFormat, + new UnmodifiableConfiguration(config.configuration))) log.info(s"Received task ${task.getTaskNameWithSubtasks}") http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index b9cb416..b71b01e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators.testutils; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; @@ -192,6 +193,16 @@ public class MockEnvironment implements Environment { } @Override + public Configuration getTaskManagerConfiguration(){ + return new UnmodifiableConfiguration(new Configuration()); + } + + @Override + public String getHostname(){ + return "localhost"; + } + + @Override public int getNumberOfSubtasks() { return 1; } http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 10a33c3..08f0094 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -29,6 +30,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.DummyActorGateway; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; @@ -148,16 +150,20 @@ public class TaskAsyncCallTest { Collections.<BlobKey>emptyList(), 0); + ActorGateway taskManagerGateway = DummyActorGateway.INSTANCE; return new Task(tdd, mock(MemoryManager.class), mock(IOManager.class), networkEnvironment, mock(BroadcastVariableManager.class), - DummyActorGateway.INSTANCE, + taskManagerGateway, DummyActorGateway.INSTANCE, new FiniteDuration(60, TimeUnit.SECONDS), libCache, - mock(FileCache.class)); + mock(FileCache.class), + new RuntimeConfiguration( + taskManagerGateway.path(), + new UnmodifiableConfiguration(new Configuration()))); } public static class CheckpointsInOrderInvokable extends AbstractInvokable http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index c6a8cb8..6d9df6d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager; import com.google.common.collect.Maps; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -725,7 +726,10 @@ public class TaskTest { jobManagerGateway, new FiniteDuration(60, TimeUnit.SECONDS), libCache, - mock(FileCache.class)); + mock(FileCache.class), + new RuntimeConfiguration( + taskManagerGateway.path(), + new UnmodifiableConfiguration(new Configuration()))); } private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) { http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index bbc64f1..44013ef 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; @@ -290,5 +291,15 @@ public class StreamMockEnvironment implements Environment { @Override public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) { } + + @Override + public Configuration getTaskManagerConfiguration(){ + return new UnmodifiableConfiguration(new Configuration()); + } + + @Override + public String getHostname(){ + return "localhost"; + } }
