[FLINK-2425] [FLINK-2426] [runtime] Add an unmodifiable config and provide 
access to task manager configuration and hostname inside RuntimeEnvironment


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8eb9cbf8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8eb9cbf8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8eb9cbf8

Branch: refs/heads/master
Commit: 8eb9cbf88111b0375860a3965e093a736455db79
Parents: 24dee42
Author: Sachin Goel <[email protected]>
Authored: Wed Jul 29 18:51:58 2015 +0530
Committer: Stephan Ewen <[email protected]>
Committed: Mon Aug 3 18:48:07 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      |   2 +-
 .../UnmodifiableConfiguration.java              | 100 +++++++++++++++++++
 .../UnmodifiableConfigurationTest.java          |  46 +++++++++
 .../flink/runtime/execution/Environment.java    |  10 ++
 .../runtime/taskmanager/RuntimeEnvironment.java |  22 +++-
 .../apache/flink/runtime/taskmanager/Task.java  |   9 +-
 .../taskmanager/RuntimeConfiguration.scala      |  23 +++++
 .../flink/runtime/taskmanager/TaskManager.scala |   7 +-
 .../operators/testutils/MockEnvironment.java    |  11 ++
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  10 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   6 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  11 ++
 12 files changed, 246 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java 
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index c095b5f..e9d7621 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -472,7 +472,7 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters implement
 
        // 
--------------------------------------------------------------------------------------------
        
-       private <T> void setValueInternal(String key, T value) {
+       <T> void setValueInternal(String key, T value) {
                if (key == null) {
                        throw new NullPointerException("Key must not be null.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
new file mode 100644
index 0000000..b436a53
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/UnmodifiableConfiguration.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unmodifiable version of the Configuration class
+ */
+public class UnmodifiableConfiguration extends Configuration {
+
+       /** The log object used for debugging. */
+       private static final Logger LOG = 
LoggerFactory.getLogger(UnmodifiableConfiguration.class);
+
+       public UnmodifiableConfiguration(Configuration config) {
+               super();
+               super.addAll(config);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  All setter methods must fail.
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public final void setClass(String key, Class<?> klazz) {
+               error();
+       }
+
+       @Override
+       public final void setString(String key, String value) {
+               error();
+       }
+
+       @Override
+       public final void setInteger(String key, int value) {
+               error();
+       }
+
+       @Override
+       public final void setLong(String key, long value) {
+               error();
+       }
+
+       @Override
+       public final void setBoolean(String key, boolean value) {
+               error();
+       }
+
+       @Override
+       public final void setFloat(String key, float value) {
+               error();
+       }
+
+       @Override
+       public final void setDouble(String key, double value) {
+               error();
+       }
+
+       @Override
+       public final void setBytes(String key, byte[] bytes) {
+               error();
+       }
+
+       @Override
+       public final void addAll(Configuration other) {
+               error();
+       }
+
+       @Override
+       public final void addAll(Configuration other, String prefix) {
+               error();
+       }
+
+       @Override
+       <T> void setValueInternal(String key, T value){
+               error();
+       }
+
+       private final void error(){
+               throw new UnsupportedOperationException("The unmodifiable 
configuration object doesn't allow set methods.");
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
new file mode 100644
index 0000000..302b72b
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+/**
+ * This class verifies that the Unmodifiable Configuration class overrides all 
setter methods in
+ * Configuration.
+ */
+public class UnmodifiableConfigurationTest {
+
+       private static Configuration pc = new Configuration();
+       private static UnmodifiableConfiguration unConf = new 
UnmodifiableConfiguration(pc);
+       private static Class clazz = unConf.getClass();
+
+       @Test
+       public void testOverride() throws Exception{
+               for(Method m : clazz.getMethods()){
+                       if(m.getName().indexOf("set") == 0 || 
m.getName().indexOf("add") == 0 ) {
+                               assertEquals(clazz, m.getDeclaringClass());
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index c561869..af29560 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -72,6 +72,16 @@ public interface Environment {
        Configuration getTaskConfiguration();
 
        /**
+        * @return The task manager configuration
+        */
+       Configuration getTaskManagerConfiguration();
+
+       /**
+        * @return Hostname of the task manager
+        */
+       String getHostname();
+
+       /**
         * Returns the job-wide configuration object that was attached to the 
JobGraph.
         *
         * @return The job-wide configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 5e276bf..c0dfee6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -75,6 +75,10 @@ public class RuntimeEnvironment implements Environment {
 
        private final AccumulatorRegistry accumulatorRegistry;
 
+       private Configuration taskManagerConfiguration;
+
+       private String hostname;
+
        // 
------------------------------------------------------------------------
 
        public RuntimeEnvironment(
@@ -96,7 +100,8 @@ public class RuntimeEnvironment implements Environment {
                        Map<String, Future<Path>> distCacheEntries,
                        ResultPartitionWriter[] writers,
                        InputGate[] inputGates,
-                       ActorGateway jobManager) {
+                       ActorGateway jobManager,
+                       RuntimeConfiguration taskManagerConfig) {
                
                checkArgument(parallelism > 0 && subtaskIndex >= 0 && 
subtaskIndex < parallelism);
 
@@ -119,9 +124,10 @@ public class RuntimeEnvironment implements Environment {
                this.writers = checkNotNull(writers);
                this.inputGates = checkNotNull(inputGates);
                this.jobManager = checkNotNull(jobManager);
+               this.taskManagerConfiguration = 
checkNotNull(taskManagerConfig).configuration();
+               this.hostname = taskManagerConfig.hostname();
        }
 
-
        // 
------------------------------------------------------------------------
        
        @Override
@@ -168,7 +174,17 @@ public class RuntimeEnvironment implements Environment {
        public Configuration getTaskConfiguration() {
                return taskConfiguration;
        }
-       
+
+       @Override
+       public Configuration getTaskManagerConfiguration(){
+               return taskManagerConfiguration;
+       }
+
+       @Override
+       public String getHostname(){
+               return hostname;
+       }
+
        @Override
        public ClassLoader getUserClassLoader() {
                return userCodeClassLoader;

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c4f62fb..878a69a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -214,6 +214,9 @@ public class Task implements Runnable {
         * initialization, to be memory friendly */
        private volatile SerializedValue<StateHandle<?>> operatorState;
 
+       /** Access to task manager configuration and host names*/
+       private RuntimeConfiguration taskManagerConfig;
+
        /**
         * <p><b>IMPORTANT:</b> This constructor may not start any work that 
would need to 
         * be undone in the case of a failing task deployment.</p>
@@ -227,7 +230,8 @@ public class Task implements Runnable {
                                ActorGateway jobManagerActor,
                                FiniteDuration actorAskTimeout,
                                LibraryCacheManager libraryCache,
-                               FileCache fileCache)
+                               FileCache fileCache,
+                               RuntimeConfiguration taskManagerConfig)
        {
                checkArgument(tdd.getNumberOfSubtasks() > 0);
                checkArgument(tdd.getIndexInSubtaskGroup() >= 0);
@@ -258,6 +262,7 @@ public class Task implements Runnable {
                this.libraryCache = checkNotNull(libraryCache);
                this.fileCache = checkNotNull(fileCache);
                this.network = checkNotNull(networkEnvironment);
+               this.taskManagerConfig = checkNotNull(taskManagerConfig);
 
                this.executionListenerActors = new 
CopyOnWriteArrayList<ActorGateway>();
 
@@ -510,7 +515,7 @@ public class Task implements Runnable {
                                        userCodeClassLoader, memoryManager, 
ioManager,
                                        broadcastVariableManager, 
accumulatorRegistry,
                                        splitProvider, distributedCacheEntries,
-                                       writers, inputGates, jobManager);
+                                       writers, inputGates, jobManager, 
taskManagerConfig);
 
                        // let the task code create its readers and writers
                        invokable.setEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
new file mode 100644
index 0000000..ef0e705
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/RuntimeConfiguration.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager
+
+import org.apache.flink.configuration.UnmodifiableConfiguration
+
+case class RuntimeConfiguration(hostname: String, configuration: 
UnmodifiableConfiguration)

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0ec1040..3ab271a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -36,7 +36,7 @@ import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, 
GarbageCollectorMetricSet}
 import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 
-import org.apache.flink.configuration.{Configuration, ConfigConstants, 
GlobalConfiguration, IllegalConfigurationException}
+import org.apache.flink.configuration._
 
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, 
TriggerCheckpoint, AbstractCheckpointMessage}
@@ -892,7 +892,10 @@ class TaskManager(
         jobManagerGateway,
         config.timeout,
         libCache,
-        fileCache)
+        fileCache,
+        new RuntimeConfiguration(
+          self.path.toSerializationFormat,
+          new UnmodifiableConfiguration(config.configuration)))
 
       log.info(s"Received task ${task.getTaskNameWithSubtasks}")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index b9cb416..b71b01e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators.testutils;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -192,6 +193,16 @@ public class MockEnvironment implements Environment {
        }
 
        @Override
+       public Configuration getTaskManagerConfiguration(){
+               return new UnmodifiableConfiguration(new Configuration());
+       }
+
+       @Override
+       public String getHostname(){
+               return "localhost";
+       }
+
+       @Override
        public int getNumberOfSubtasks() {
                return 1;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 10a33c3..08f0094 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -148,16 +150,20 @@ public class TaskAsyncCallTest {
                                Collections.<BlobKey>emptyList(),
                                0);
 
+               ActorGateway taskManagerGateway = DummyActorGateway.INSTANCE;
                return new Task(tdd,
                                mock(MemoryManager.class),
                                mock(IOManager.class),
                                networkEnvironment,
                                mock(BroadcastVariableManager.class),
-                               DummyActorGateway.INSTANCE,
+                               taskManagerGateway,
                                DummyActorGateway.INSTANCE,
                                new FiniteDuration(60, TimeUnit.SECONDS),
                                libCache,
-                               mock(FileCache.class));
+                               mock(FileCache.class),
+                               new RuntimeConfiguration(
+                                               taskManagerGateway.path(),
+                                               new 
UnmodifiableConfiguration(new Configuration())));
        }
        
        public static class CheckpointsInOrderInvokable extends 
AbstractInvokable

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index c6a8cb8..6d9df6d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import com.google.common.collect.Maps;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -725,7 +726,10 @@ public class TaskTest {
                                jobManagerGateway,
                                new FiniteDuration(60, TimeUnit.SECONDS),
                                libCache,
-                               mock(FileCache.class));
+                               mock(FileCache.class),
+                               new RuntimeConfiguration(
+                                               taskManagerGateway.path(),
+                                               new 
UnmodifiableConfiguration(new Configuration())));
        }
 
        private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? 
extends AbstractInvokable> invokable) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8eb9cbf8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index bbc64f1..44013ef 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -290,5 +291,15 @@ public class StreamMockEnvironment implements Environment {
        @Override
        public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> 
state) {
        }
+
+       @Override
+       public Configuration getTaskManagerConfiguration(){
+               return new UnmodifiableConfiguration(new Configuration());
+       }
+
+       @Override
+       public String getHostname(){
+               return "localhost";
+       }
 }
 

Reply via email to