[FLINK-3904] enhancements to GlobalConfiguration

- fail if config couldn't be loaded
- remove duplicate api methods
- remove undocumented XML loading feature
- generate yaml conf in tests instead of xml conf
- only load one config file instead of all xml or yaml files (flink-conf.yaml)
- make globalconfiguration non-global and remove static SINGLETON
- fix test cases
- add test cases

This closes #2123


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5eb0e38f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5eb0e38f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5eb0e38f

Branch: refs/heads/master
Commit: 5eb0e38fbb346fea05932c3afc24a2d071e8974f
Parents: 12bf7c1
Author: Maximilian Michels <[email protected]>
Authored: Wed Jul 27 17:06:06 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Wed Jul 27 16:40:09 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |   4 +-
 .../CliFrontendAddressConfigurationTest.java    |   5 -
 .../flink/client/CliFrontendListCancelTest.java |   1 -
 .../apache/flink/client/CliFrontendRunTest.java |   1 -
 .../flink/client/CliFrontendStopTest.java       |   2 -
 .../flink/client/CliFrontendTestUtils.java      |  28 +-
 .../org/apache/flink/storm/api/FlinkClient.java |   8 +-
 .../apache/flink/storm/api/FlinkSubmitter.java  |   2 +-
 .../flink/api/common/io/BinaryOutputFormat.java |   2 +-
 .../api/common/io/DelimitedInputFormat.java     |  30 +-
 .../flink/api/common/io/FileInputFormat.java    |  25 +-
 .../flink/api/common/io/FileOutputFormat.java   |  34 +-
 .../api/common/io/GenericCsvInputFormat.java    |   8 +-
 .../apache/flink/api/common/io/InputFormat.java |   2 +-
 .../api/common/io/SerializedOutputFormat.java   |   2 +-
 .../configuration/GlobalConfiguration.java      | 395 +++----------------
 .../api/common/io/BinaryInputFormatTest.java    |   3 +-
 .../io/DelimitedInputFormatSamplingTest.java    |  44 ++-
 .../api/common/io/DelimitedInputFormatTest.java |   6 +-
 .../api/common/io/EnumerateNestedFilesTest.java |   4 +-
 .../api/common/io/FileOutputFormatTest.java     |  12 +-
 .../api/common/io/RichInputFormatTest.java      |   1 +
 .../api/common/io/RichOutputFormatTest.java     |   1 +
 .../FilesystemSchemeConfigTest.java             |  20 +-
 .../configuration/GlobalConfigurationTest.java  | 201 +++-------
 .../apache/flink/testutils/TestConfigUtils.java |  29 +-
 .../java/hadoop/mapred/utils/HadoopUtils.java   |  10 +-
 .../hadoop/mapreduce/utils/HadoopUtils.java     |  12 +-
 .../flink/api/java/io/PrimitiveInputFormat.java |   4 +-
 .../flink/api/java/io/TextInputFormat.java      |   2 +-
 .../flink/api/java/io/TextValueInputFormat.java |   2 +-
 .../flink/python/api/PythonPlanBinder.java      |  11 +-
 .../plantranslate/JobGraphGenerator.java        |  10 +-
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  10 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   3 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   1 -
 .../BlobLibraryCacheManagerTest.java            |   4 -
 .../org/apache/flink/api/scala/FlinkShell.scala |   4 +-
 .../flink/api/scala/ScalaShellITCase.scala      |   4 +-
 .../environment/StreamContextEnvironment.java   |   2 +-
 .../api/environment/StreamPlanEnvironment.java  |   2 +-
 ...CliFrontendYarnAddressConfigurationTest.java |   8 -
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   2 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |   2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   7 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   3 +-
 46 files changed, 273 insertions(+), 700 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index a888841..7c2ee2e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -153,9 +153,7 @@ public class CliFrontend {
 
                // load the configuration
                LOG.info("Trying to load configuration file");
-               
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
-               System.setProperty(ConfigConstants.ENV_FLINK_CONF_DIR, 
configDirectory.getAbsolutePath());
-               this.config = GlobalConfiguration.getConfiguration();
+               this.config = 
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
 
                try {
                        FileSystem.setDefaultScheme(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index 41d8622..8320e04 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -48,11 +48,6 @@ public class CliFrontendAddressConfigurationTest {
                CliFrontendTestUtils.pipeSystemOutToNull();
        }
 
-       @Before
-       public void clearConfig() {
-               CliFrontendTestUtils.clearGlobalConfiguration();
-       }
-
        @Test
        public void testValidConfig() {
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 736d859..524e7e7 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -55,7 +55,6 @@ public class CliFrontendListCancelTest {
        @BeforeClass
        public static void init() {
                CliFrontendTestUtils.pipeSystemOutToNull();
-               CliFrontendTestUtils.clearGlobalConfiguration();
        }
        
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index f710d8e..0326eab 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -36,7 +36,6 @@ public class CliFrontendRunTest {
        @BeforeClass
        public static void init() {
                CliFrontendTestUtils.pipeSystemOutToNull();
-               CliFrontendTestUtils.clearGlobalConfiguration();
        }
        
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
index 7c34c75..9522ac7 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -35,7 +35,6 @@ import org.junit.Test;
 import java.util.UUID;
 
 import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-import static 
org.apache.flink.client.CliFrontendTestUtils.clearGlobalConfiguration;
 import static org.junit.Assert.*;
 
 public class CliFrontendStopTest extends TestLogger {
@@ -45,7 +44,6 @@ public class CliFrontendStopTest extends TestLogger {
        @BeforeClass
        public static void setup() {
                pipeSystemOutToNull();
-               clearGlobalConfiguration();
                actorSystem = ActorSystem.create("TestingActorSystem");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 1872133..c411a7b 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -71,33 +71,7 @@ public class CliFrontendTestUtils {
                System.setOut(new PrintStream(new BlackholeOutputSteam()));
                System.setErr(new PrintStream(new BlackholeOutputSteam()));
        }
-       
-       public static void clearGlobalConfiguration() {
-               try {
-                       Field singletonInstanceField = 
GlobalConfiguration.class.getDeclaredField("SINGLETON");
-                       Field conf = 
GlobalConfiguration.class.getDeclaredField("config");
-                       Field map = 
Configuration.class.getDeclaredField("confData");
-                       
-                       singletonInstanceField.setAccessible(true);
-                       conf.setAccessible(true);
-                       map.setAccessible(true);
-                       
-                       GlobalConfiguration gconf = (GlobalConfiguration) 
singletonInstanceField.get(null);
-                       if (gconf != null) {
-                               Configuration confObject = (Configuration) 
conf.get(gconf);
-                               @SuppressWarnings("unchecked")
-                               Map<String, Object> confData = (Map<String, 
Object>) map.get(confObject);
-                               confData.clear();
-                       }
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test initialization caused an exception: " + 
e.getMessage());
-               }
-               
-       }
-       
+
        private static final class BlackholeOutputSteam extends 
java.io.OutputStream {
                @Override
                public void write(int b){}

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 6ad250d..9628bb7 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -242,7 +242,7 @@ public class FlinkClient {
                        }
                }
 
-               final Configuration configuration = 
GlobalConfiguration.getConfiguration();
+               final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
                
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
this.jobManagerHost);
                
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
this.jobManagerPort);
 
@@ -271,7 +271,7 @@ public class FlinkClient {
         * @return Flink's internally used {@link JobID}.
         */
        JobID getTopologyJobId(final String id) {
-               final Configuration configuration = 
GlobalConfiguration.getConfiguration();
+               final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
                if (this.timeout != null) {
                        
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
                }
@@ -311,7 +311,7 @@ public class FlinkClient {
        }
 
        private FiniteDuration getTimeout() {
-               final Configuration configuration = 
GlobalConfiguration.getConfiguration();
+               final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
                if (this.timeout != null) {
                        
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
                }
@@ -320,7 +320,7 @@ public class FlinkClient {
        }
 
        private ActorRef getJobManager() throws IOException {
-               final Configuration configuration = 
GlobalConfiguration.getConfiguration();
+               final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
 
                ActorSystem actorSystem;
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
index 13a39ef..f8932b1 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -87,7 +87,7 @@ public class FlinkSubmitter {
                        throw new IllegalArgumentException("Storm conf is not 
valid. Must be json-serializable");
                }
 
-               final Configuration flinkConfig = 
GlobalConfiguration.getConfiguration();
+               final Configuration flinkConfig = 
GlobalConfiguration.loadConfiguration();
                if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
                        stormConf.put(Config.NIMBUS_HOST,
                                        
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
"localhost"));

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
index a89e73e..059198c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
@@ -45,7 +45,7 @@ public abstract class BinaryOutputFormat<T> extends 
FileOutputFormat<T> {
        
        private transient DataOutputViewStreamWrapper outView;
 
-       
+
        @Override
        public void close() throws IOException {
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 99aa022..fd02c82 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -20,13 +20,13 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
@@ -84,12 +84,18 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
         */
        private static int MAX_SAMPLE_LEN;
 
-       static { loadGlobalConfigParams(); }
-       
+       /**
+        * @Deprecated Please use {@code loadConfigParameters(Configuration 
config}
+        */
+       @Deprecated
        protected static void loadGlobalConfigParams() {
-               int maxSamples = 
GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY,
+               loadConfigParameters(GlobalConfiguration.loadConfiguration());
+       }
+
+       protected static void loadConfigParameters(Configuration parameters) {
+               int maxSamples = 
parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY,
                                
ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_LINE_SAMPLES);
-               int minSamples = 
GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
+               int minSamples = 
parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
                        
ConfigConstants.DEFAULT_DELIMITED_FORMAT_MIN_LINE_SAMPLES);
                
                if (maxSamples < 0) {
@@ -113,7 +119,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
                        DEFAULT_MIN_NUM_SAMPLES = minSamples;
                }
                
-               int maxLen = 
GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_SAMPLE_LENGTH_KEY,
+               int maxLen = 
parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_SAMPLE_LENGTH_KEY,
                                
ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN);
                if (maxLen <= 0) {
                        maxLen = 
ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN;
@@ -164,13 +170,17 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
        // 
--------------------------------------------------------------------------------------------
        //  Constructors & Getters/setters for the configurable parameters
        // 
--------------------------------------------------------------------------------------------
-       
+
        public DelimitedInputFormat() {
-               super();
+               this(null, null);
        }
-       
-       protected DelimitedInputFormat(Path filePath) {
+
+       protected DelimitedInputFormat(Path filePath, Configuration 
configuration) {
                super(filePath);
+               if (configuration == null) {
+                       configuration = GlobalConfiguration.loadConfiguration();
+               }
+               loadConfigParameters(configuration);
        }
        
        

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 95a1ffa..72d6061 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -87,15 +87,19 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
         * The splitLength is set to -1L for reading the whole split.
         */
        protected static final long READ_WHOLE_SPLIT_FLAG = -1L;
-       
+
        static {
-               initDefaultsFromConfiguration();
+               
initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration());
                initDefaultInflaterInputStreamFactories();
        }
-       
-       private static void initDefaultsFromConfiguration() {
-               
-               final long to = 
GlobalConfiguration.getLong(ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY,
+
+       /**
+        * Initialize defaults for input format. Needs to be a static method 
because it is configured for local
+        * cluster execution, see LocalFlinkMiniCluster.
+        * @param configuration The configuration to load defaults from
+        */
+       private static void initDefaultsFromConfiguration(Configuration 
configuration) {
+               final long to = 
configuration.getLong(ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY,
                        ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT);
                if (to < 0) {
                        LOG.error("Invalid timeout value for filesystem stream 
opening: " + to + ". Using default value of " +
@@ -154,10 +158,6 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
                }
        }
        
-       static long getDefaultOpeningTimeout() {
-               return DEFAULT_OPENING_TIMEOUT;
-       }
-       
        // 
--------------------------------------------------------------------------------------------
        //  Variables for internal operation.
        //  They are all transient, because we do not want them so be 
serialized 
@@ -224,11 +224,8 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
        // 
--------------------------------------------------------------------------------------------
 
 
        public FileInputFormat() {}
-       
+
        protected FileInputFormat(Path filePath) {
-               if (filePath == null) {
-                       throw new IllegalArgumentException("The file path must 
not be null.");
-               }
                this.filePath = filePath;
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 557c342..7530ba1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -22,11 +22,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -62,26 +62,30 @@ public abstract class FileOutputFormat<IT> extends 
RichOutputFormat<IT> implemen
        private static WriteMode DEFAULT_WRITE_MODE;
        
        private static OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
-       
-       
-       private static final void initDefaultsFromConfiguration(Configuration 
configuration) {
-               final boolean overwrite = 
configuration.getBoolean(ConfigConstants
-                                               
.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
+
+       static {
+               
initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration());
+       }
+
+       /**
+        * Initialize defaults for output format. Needs to be a static method 
because it is configured for local
+        * cluster execution, see LocalFlinkMiniCluster.
+        * @param configuration The configuration to load defaults from
+        */
+       private static void initDefaultsFromConfiguration(Configuration 
configuration) {
+               final boolean overwrite = configuration.getBoolean(
+                               
ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
                                ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE);
        
                DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : 
WriteMode.NO_OVERWRITE;
                
-               final boolean alwaysCreateDirectory = 
configuration.getBoolean(ConfigConstants
-                                               
.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
+               final boolean alwaysCreateDirectory = configuration.getBoolean(
+                       
ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
                        
ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY);
        
                DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? 
OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY;
        }
-       
-       static {
-               
initDefaultsFromConfiguration(GlobalConfiguration.getConfiguration());
-       }
-       
+
        // 
--------------------------------------------------------------------------------------------
 
        
        /**
@@ -121,9 +125,9 @@ public abstract class FileOutputFormat<IT> extends 
RichOutputFormat<IT> implemen
        private transient boolean fileCreated;
 
        // 
--------------------------------------------------------------------------------------------
-       
+
        public FileOutputFormat() {}
-       
+
        public FileOutputFormat(Path outputPath) {
                this.outputFilePath = outputPath;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index e2c54ad..85d9cd8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -98,15 +98,15 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        // 
--------------------------------------------------------------------------------------------
        //  Constructors and getters/setters for the configurable parameters
        // 
--------------------------------------------------------------------------------------------
-       
+
        protected GenericCsvInputFormat() {
                super();
        }
-       
+
        protected GenericCsvInputFormat(Path filePath) {
-               super(filePath);
+               super(filePath, null);
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
 
        public int getNumberOfFieldsTotal() {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
index 0e978b9..300c237 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
@@ -70,7 +70,7 @@ public interface InputFormat<OT, T extends InputSplit> 
extends InputSplitSource<
         * <p>
         * This method is always called first on a newly instantiated input 
format. 
         *  
-        * @param parameters The configuration with all parameters.
+        * @param parameters The configuration with all parameters (note: not 
the Flink config but the TaskConfig).
         */
        void configure(Configuration parameters);
        

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
index 0ef160e..edbe1a0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
@@ -33,7 +33,7 @@ import org.apache.flink.core.memory.DataOutputView;
 public class SerializedOutputFormat<T extends IOReadableWritable> extends 
BinaryOutputFormat<T> {
        
        private static final long serialVersionUID = 1L;
-       
+
        @Override
        protected void serialize(T record, DataOutputView dataOutputView) 
throws IOException {
                record.write(dataOutputView);

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 7e50486..8d550d7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -21,21 +21,12 @@ package org.apache.flink.configuration;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
 import org.apache.flink.annotation.Internal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.w3c.dom.Text;
 
 /**
  * Global configuration object for Flink. Similar to Java properties 
configuration
@@ -44,159 +35,62 @@ import org.w3c.dom.Text;
 @Internal
 public final class GlobalConfiguration {
 
-       /** The log object used for debugging. */
        private static final Logger LOG = 
LoggerFactory.getLogger(GlobalConfiguration.class);
 
-       /** The global configuration object accessible through a singleton 
pattern. */
-       private static GlobalConfiguration SINGLETON = null;
-
-       /** The internal map holding the key-value pairs the configuration 
consists of. */
-       private final Configuration config = new Configuration();
+       public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
 
        // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Retrieves the singleton object of the global configuration.
-        * 
-        * @return the global configuration object
-        */
-       private static GlobalConfiguration get() {
-               // lazy initialization currently only for testibility
-               synchronized (GlobalConfiguration.class) {
-                       if (SINGLETON == null) {
-                               SINGLETON = new GlobalConfiguration();
-                       }
-                       return SINGLETON;
-               }
-       }
 
-       /**
-        * The constructor used to construct the singleton instance of the 
global configuration.
-        */
        private GlobalConfiguration() {}
 
        // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Returns the value associated with the given key as a string.
-        * 
-        * @param key
-        *        the key pointing to the associated value
-        * @param defaultValue
-        *        the default value which is returned in case there is no value 
associated with the given key
-        * @return the (default) value associated with the given key
-        */
-       public static String getString(String key, String defaultValue) {
-               return get().config.getString(key, defaultValue);
-       }
-
-       /**
-        * Returns the value associated with the given key as a long integer.
-        * 
-        * @param key
-        *        the key pointing to the associated value
-        * @param defaultValue
-        *        the default value which is returned in case there is no value 
associated with the given key
-        * @return the (default) value associated with the given key
-        */
-       public static long getLong(String key, long defaultValue) {
-               return get().config.getLong(key, defaultValue);
-       }
 
        /**
-        * Returns the value associated with the given key as an integer.
-        * 
-        * @param key
-        *        the key pointing to the associated value
-        * @param defaultValue
-        *        the default value which is returned in case there is no value 
associated with the given key
-        * @return the (default) value associated with the given key
-        */
-       public static int getInteger(String key, int defaultValue) {
-               return get().config.getInteger(key, defaultValue);
-       }
-       
-       /**
-        * Returns the value associated with the given key as a float.
-        * 
-        * @param key
-        *        the key pointing to the associated value
-        * @param defaultValue
-        *        the default value which is returned in case there is no value 
associated with the given key
-        * @return the (default) value associated with the given key
+        * Loads the global configuration from the environment. Fails if an 
error occurs during loading. Returns an
+        * empty configuration object if the environment variable is not set. 
In production this variable is set but
+        * tests and local execution/debugging don't have this environment 
variable set. That's why we should fail
+        * if it is not set.
+        * @return Returns the Configuration
         */
-       public static float getFloat(String key, float defaultValue) {
-               return get().config.getFloat(key, defaultValue);
-       }
-
-       /**
-        * Returns the value associated with the given key as a boolean.
-        * 
-        * @param key
-        *        the key pointing to the associated value
-        * @param defaultValue
-        *        the default value which is returned in case there is no value 
associated with the given key
-        * @return the (default) value associated with the given key
-        */
-       public static boolean getBoolean(String key, boolean defaultValue) {
-               return get().config.getBoolean(key, defaultValue);
+       public static Configuration loadConfiguration() {
+               final String configDir = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+               if (configDir == null) {
+                       return new Configuration();
+               }
+               return loadConfiguration(configDir);
        }
 
        /**
         * Loads the configuration files from the specified directory.
         * <p>
-        * XML and YAML are supported as configuration files. If both XML and 
YAML files exist in the configuration
-        * directory, keys from YAML will overwrite keys from XML.
+        * YAML files are supported as configuration files.
         * 
         * @param configDir
         *        the directory which contains the configuration files
         */
-       public static void loadConfiguration(final String configDir) {
+       public static Configuration loadConfiguration(final String configDir) {
 
                if (configDir == null) {
-                       LOG.warn("Given configuration directory is null, cannot 
load configuration");
-                       return;
+                       throw new IllegalArgumentException("Given configuration 
directory is null, cannot load configuration");
                }
 
                final File confDirFile = new File(configDir);
                if (!(confDirFile.exists())) {
-                       LOG.warn("The given configuration directory name '" + 
configDir + "' (" + confDirFile.getAbsolutePath()
-                               + ") does not describe an existing directory.");
-                       return;
+                       throw new IllegalConfigurationException(
+                               "The given configuration directory name '" + 
configDir +
+                                       "' (" + confDirFile.getAbsolutePath() + 
") does not describe an existing directory.");
                }
-               
-               if (confDirFile.isFile()) {
-                       final File file = new File(configDir);
-                       if(configDir.endsWith(".xml")) {
-                               get().loadXMLResource( file );
-                       } else if(configDir.endsWith(".yaml")) {
-                               get().loadYAMLResource(file);
-                       } else {
-                               LOG.warn("The given configuration has an 
unknown extension.");
-                               return;
-                       }
-                       return;
-               }
-
-               // get all XML and YAML files in the directory
-               final File[] xmlFiles = filterFilesBySuffix(confDirFile, 
".xml");
-               final File[] yamlFiles = filterFilesBySuffix(confDirFile, new 
String[] { ".yaml", ".yml" });
 
-               if ((xmlFiles == null || xmlFiles.length == 0) && (yamlFiles == 
null || yamlFiles.length == 0)) {
-                       LOG.warn("Unable to get the contents of the config 
directory '" + configDir + "' ("
-                               + confDirFile.getAbsolutePath() + ").");
-                       return;
-               }
+               // get Flink yaml configuration file
+               final File yamlConfigFile = new File(confDirFile, 
FLINK_CONF_FILENAME);
 
-               // load config files and write into config map
-               for (File f : xmlFiles) {
-                       get().loadXMLResource(f);
+               if (!yamlConfigFile.exists()) {
+                       throw new IllegalConfigurationException(
+                               "The Flink config file '" + yamlConfigFile +
+                                       "' (" + confDirFile.getAbsolutePath() + 
") does not exist.");
                }
 
-               // => if both XML and YAML files exist, the YAML config keys 
overwrite XML settings
-               for (File f : yamlFiles) {
-                       get().loadYAMLResource(f);
-               }
+               return loadYAMLResource(yamlConfigFile);
        }
 
        /**
@@ -219,237 +113,46 @@ public final class GlobalConfiguration {
         * @param file the YAML file to read from
         * @see <a href="http://www.yaml.org/spec/1.2/spec.html";>YAML 1.2 
specification</a>
         */
-       private void loadYAMLResource(File file) {
-
-               synchronized (getClass()) {
-
-                       BufferedReader reader = null;
-                       try {
-                               reader = new BufferedReader(new 
InputStreamReader(new FileInputStream(file)));
-       
-                               String line = null;
-                               while ((line = reader.readLine()) != null) {
-       
-                                       // 1. check for comments
-                                       String[] comments = line.split("#", 2);
-                                       String conf = comments[0];
-       
-                                       // 2. get key and value
-                                       if (conf.length() > 0) {
-                                               String[] kv = conf.split(": ", 
2);
-       
-                                               // skip line with no valid 
key-value pair
-                                               if (kv.length == 1) {
-                                                       LOG.warn("Error while 
trying to split key and value in configuration file " + file + ": " + line);
-                                                       continue;
-                                               }
-       
-                                               String key = kv[0].trim();
-                                               String value = kv[1].trim();
-                                               
-                                               // sanity check
-                                               if (key.length() == 0 || 
value.length() == 0) {
-                                                       LOG.warn("Error after 
splitting key and value in configuration file " + file + ": " + line);
-                                                       continue;
-                                               }
-       
-                                               LOG.debug("Loading 
configuration property: {}, {}", key, value);
-       
-                                               this.config.setString(key, 
value);
-                                       }
-                               }
-                       }
-                       catch (IOException e) {
-                               LOG.error("Error parsing YAML configuration.", 
e);
-                       }
-                       finally {
-                               try {
-                                       if(reader != null) {
-                                               reader.close();
-                                       }
-                               } catch (IOException e) {
-                                       LOG.warn("Cannot to close reader with 
IOException.", e);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Loads an XML document of key-values pairs.
-        * 
-        * @param file
-        *        the XML document file
-        */
-       private void loadXMLResource(File file) {
+       private static Configuration loadYAMLResource(File file) {
+               final Configuration config = new Configuration();
 
-               final DocumentBuilderFactory docBuilderFactory = 
DocumentBuilderFactory.newInstance();
-               // Ignore comments in the XML file
-               docBuilderFactory.setIgnoringComments(true);
-               docBuilderFactory.setNamespaceAware(true);
+               try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(new FileInputStream(file)))){
 
-               try {
+                       String line;
+                       while ((line = reader.readLine()) != null) {
 
-                       final DocumentBuilder builder = 
docBuilderFactory.newDocumentBuilder();
-                       Document doc;
-                       Element root;
+                               // 1. check for comments
+                               String[] comments = line.split("#", 2);
+                               String conf = comments[0];
 
-                       doc = builder.parse(file);
-
-                       if (doc == null) {
-                               LOG.warn("Cannot load configuration: doc is 
null");
-                               return;
-                       }
+                               // 2. get key and value
+                               if (conf.length() > 0) {
+                                       String[] kv = conf.split(": ", 2);
 
-                       root = doc.getDocumentElement();
-                       if (root == null) {
-                               LOG.warn("Cannot load configuration: root is 
null");
-                               return;
-                       }
-
-                       if (!"configuration".equals(root.getNodeName())) {
-                               return;
-                       }
-
-                       final NodeList props = root.getChildNodes();
-                       int propNumber = -1;
-
-                       synchronized (getClass()) {
-
-                               for (int i = 0; i < props.getLength(); i++) {
-
-                                       final Node propNode = props.item(i);
-                                       String key = null;
-                                       String value = null;
-
-                                       // Ignore text at this point
-                                       if (propNode instanceof Text) {
+                                       // skip line with no valid key-value 
pair
+                                       if (kv.length == 1) {
+                                               LOG.warn("Error while trying to 
split key and value in configuration file " + file + ": " + line);
                                                continue;
                                        }
 
-                                       if (!(propNode instanceof Element)) {
-                                               continue;
-                                       }
-
-                                       Element property = (Element) propNode;
-                                       if 
(!"property".equals(property.getNodeName())) {
-                                               continue;
-                                       }
+                                       String key = kv[0].trim();
+                                       String value = kv[1].trim();
 
-                                       propNumber++;
-                                       final NodeList propChildren = 
property.getChildNodes();
-                                       if (propChildren == null) {
-                                               LOG.warn("Error while reading 
configuration: property has no children, skipping...");
+                                       // sanity check
+                                       if (key.length() == 0 || value.length() 
== 0) {
+                                               LOG.warn("Error after splitting 
key and value in configuration file " + file + ": " + line);
                                                continue;
                                        }
 
-                                       for (int j = 0; j < 
propChildren.getLength(); j++) {
-
-                                               final Node propChild = 
propChildren.item(j);
-                                               if (propChild instanceof 
Element) {
-                                                       if 
("key".equals(propChild.getNodeName())) {
-                                                               if 
(propChild.getChildNodes() != null) {
-                                                                       if 
(propChild.getChildNodes().getLength() == 1) {
-                                                                               
if (propChild.getChildNodes().item(0) instanceof Text) {
-                                                                               
        final Text t = (Text) propChild.getChildNodes().item(0);
-                                                                               
        key = t.getTextContent();
-                                                                               
}
-                                                                       }
-                                                               }
-                                                       }
-
-                                                       if 
("value".equals(propChild.getNodeName())) {
-                                                               if 
(propChild.getChildNodes() != null) {
-                                                                       if 
(propChild.getChildNodes().getLength() == 1) {
-                                                                               
if (propChild.getChildNodes().item(0) instanceof Text) {
-                                                                               
        final Text t = (Text) propChild.getChildNodes().item(0);
-                                                                               
        value = t.getTextContent();
-                                                                               
}
-                                                                       }
-                                                               }
-                                                       }
-                                               }
-                                       }
-
-                                       if (key != null && value != null) {
-                                               // Put key, value pair into the 
map
-                                               LOG.debug("Loading 
configuration property: {}, {}", key, value);
-                                               this.config.setString(key, 
value);
-                                       } else {
-                                               LOG.warn("Error while reading 
configuration: Cannot read property " + propNumber);
-                                       }
+                                       LOG.debug("Loading configuration 
property: {}, {}", key, value);
+                                       config.setString(key, value);
                                }
                        }
-
-               }
-               catch (Exception e) {
-                       LOG.error("Cannot load configuration.", e);
+               } catch (IOException e) {
+                       throw new RuntimeException("Error parsing YAML 
configuration.", e);
                }
-       }
 
-       /**
-        * Gets a {@link Configuration} object with the values of this 
GlobalConfiguration
-        * 
-        * @return the {@link Configuration} object including the key/value 
pairs
-        */
-       public static Configuration getConfiguration() {
-               Configuration copy = new Configuration();
-               copy.addAll(get().config);
-               return copy;
-       }
-
-       /**
-        * Merges the given {@link Configuration} object into the global
-        * configuration. If a key/value pair with an identical already
-        * exists in the global configuration, it is overwritten by the
-        * pair of the {@link Configuration} object.
-        * 
-        * @param conf
-        *        the {@link Configuration} object to merge into the global 
configuration
-        */
-       public static void includeConfiguration(Configuration conf) {
-               get().includeConfigurationInternal(conf);
-       }
-
-       /**
-        * Internal non-static method to include configuration.
-        * 
-        * @param conf
-        *        the {@link Configuration} object to merge into the global 
configuration
-        */
-       private void includeConfigurationInternal(Configuration conf) {
-               // static synchronized
-               synchronized (getClass()) {
-                       this.config.addAll(conf);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Filters files in directory which have the specified suffix (e.g. 
".xml").
-        * 
-        * @param dirToFilter
-        *        directory to filter
-        * @param suffix
-        *        suffix to filter files by (e.g. ".xml")
-        * @return files with given ending in directory
-        */
-       private static File[] filterFilesBySuffix(final File dirToFilter, final 
String suffix) {
-               return filterFilesBySuffix(dirToFilter, new String[] { suffix 
});
+               return config;
        }
 
-       private static File[] filterFilesBySuffix(final File dirToFilter, final 
String[] suffixes) {
-               return dirToFilter.listFiles(new FilenameFilter() {
-                       @Override
-                       public boolean accept(final File dir, final String 
name) {
-                               for (String suffix : suffixes) {
-                                       if (dir.equals(dirToFilter) && name != 
null && name.endsWith(suffix)) {
-                                               return true;
-                                       }
-                               }
-
-                               return false;
-                       }
-               });
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
index 90b366c..a7374e3 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
@@ -41,7 +41,8 @@ public class BinaryInputFormatTest {
                        return record;
                }
        }
-       
+
+
        @Test
        public void testCreateInputSplitsWithOneFile() throws IOException {
                // create temporary file with 3 blocks

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
index fac979e..be73798 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
@@ -30,6 +30,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+
 public class DelimitedInputFormatSamplingTest {
        
        private static final String TEST_DATA1 = 
@@ -66,6 +67,8 @@ public class DelimitedInputFormatSamplingTest {
        
        private static final int DEFAULT_NUM_SAMPLES = 4;
        
+       private static Configuration CONFIG;
+       
        // 
========================================================================
        //  Setup
        // 
========================================================================
@@ -80,16 +83,17 @@ public class DelimitedInputFormatSamplingTest {
                
                try {
                        // make sure we do 4 samples
-                       TestConfigUtils.loadGlobalConf(
+                       CONFIG = TestConfigUtils.loadGlobalConf(
                                new String[] { 
ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
                                                                
ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY },
                                new String[] { "4", "4" });
-                       
-                       TestDelimitedInputFormat.prepare();
+
+
                } catch (Throwable t) {
                        Assert.fail("Could not load the global configuration.");
                }
        }
+
        
        // 
========================================================================
        //  Tests
@@ -101,7 +105,7 @@ public class DelimitedInputFormatSamplingTest {
                        final String tempFile = 
TestFileUtils.createTempFile(TEST_DATA1);
                        final Configuration conf = new Configuration();
                        
-                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat();
+                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat(CONFIG);
                        format.setFilePath(tempFile.replace("file", "test"));
                        format.configure(conf);
                        
@@ -109,7 +113,7 @@ public class DelimitedInputFormatSamplingTest {
                        format.getStatistics(null);
                        Assert.assertEquals("Wrong number of samples taken.", 
DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened());
                        
-                       TestDelimitedInputFormat format2 = new 
TestDelimitedInputFormat();
+                       TestDelimitedInputFormat format2 = new 
TestDelimitedInputFormat(CONFIG);
                        format2.setFilePath(tempFile.replace("file", "test"));
                        format2.setNumLineSamples(8);
                        format2.configure(conf);
@@ -130,7 +134,7 @@ public class DelimitedInputFormatSamplingTest {
                        final String tempFile = 
TestFileUtils.createTempFileDir(TEST_DATA1, TEST_DATA1, TEST_DATA1, TEST_DATA1);
                        final Configuration conf = new Configuration();
                        
-                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat();
+                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat(CONFIG);
                        format.setFilePath(tempFile.replace("file", "test"));
                        format.configure(conf);
                        
@@ -138,7 +142,7 @@ public class DelimitedInputFormatSamplingTest {
                        format.getStatistics(null);
                        Assert.assertEquals("Wrong number of samples taken.", 
DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened());
                        
-                       TestDelimitedInputFormat format2 = new 
TestDelimitedInputFormat();
+                       TestDelimitedInputFormat format2 = new 
TestDelimitedInputFormat(CONFIG);
                        format2.setFilePath(tempFile.replace("file", "test"));
                        format2.setNumLineSamples(8);
                        format2.configure(conf);
@@ -159,7 +163,7 @@ public class DelimitedInputFormatSamplingTest {
                        final String tempFile = 
TestFileUtils.createTempFile(TEST_DATA1);
                        final Configuration conf = new Configuration();
                        
-                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat();
+                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat(CONFIG);
                        format.setFilePath(tempFile);
                        format.configure(conf);
                        BaseStatistics stats = format.getStatistics(null);
@@ -180,7 +184,7 @@ public class DelimitedInputFormatSamplingTest {
                        final String tempFile = 
TestFileUtils.createTempFileDir(TEST_DATA1, TEST_DATA2);
                        final Configuration conf = new Configuration();
                        
-                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat();
+                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat(CONFIG);
                        format.setFilePath(tempFile);
                        format.configure(conf);
                        BaseStatistics stats = format.getStatistics(null);
@@ -212,7 +216,7 @@ public class DelimitedInputFormatSamplingTest {
                        final String tempFile = 
TestFileUtils.createTempFile(testData);
                        final Configuration conf = new Configuration();
                        
-                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat();
+                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat(CONFIG);
                        format.setFilePath(tempFile);
                        format.setDelimiter(DELIMITER);
                        format.configure(conf);
@@ -235,7 +239,7 @@ public class DelimitedInputFormatSamplingTest {
                        final String tempFile = TestFileUtils.createTempFile(2 
* ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN);
                        final Configuration conf = new Configuration();
                        
-                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat();
+                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat(CONFIG);
                        format.setFilePath(tempFile);
                        format.configure(conf);
                        
@@ -252,7 +256,7 @@ public class DelimitedInputFormatSamplingTest {
                        final String tempFile = 
TestFileUtils.createTempFile(TEST_DATA1);
                        final Configuration conf = new Configuration();
                        
-                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat();
+                       final TestDelimitedInputFormat format = new 
TestDelimitedInputFormat(CONFIG);
                        format.setFilePath("test://" + tempFile);
                        format.configure(conf);
                        
@@ -260,7 +264,7 @@ public class DelimitedInputFormatSamplingTest {
                        BaseStatistics stats = format.getStatistics(null);
                        Assert.assertEquals("Wrong number of samples taken.", 
DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened());
                        
-                       final TestDelimitedInputFormat format2 = new 
TestDelimitedInputFormat();
+                       final TestDelimitedInputFormat format2 = new 
TestDelimitedInputFormat(CONFIG);
                        format2.setFilePath("test://" + tempFile);
                        format2.configure(conf);
                        
@@ -274,21 +278,21 @@ public class DelimitedInputFormatSamplingTest {
                        Assert.fail(e.getMessage());
                }
        }
-       
+
        // 
========================================================================
        //  Mocks
        // 
========================================================================
-       
+
        private static final class TestDelimitedInputFormat extends 
DelimitedInputFormat<IntValue> {
                private static final long serialVersionUID = 1L;
-               
+
+               TestDelimitedInputFormat(Configuration configuration) {
+                       super(null, configuration);
+               }
+
                @Override
                public IntValue readRecord(IntValue reuse, byte[] bytes, int 
offset, int numBytes) {
                        throw new UnsupportedOperationException();
                }
-               
-               public static void prepare() {
-                       DelimitedInputFormat.loadGlobalConfigParams();
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 599a640..8a31099 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 
@@ -47,17 +48,18 @@ import org.junit.Test;
 
 public class DelimitedInputFormatTest {
        
-       private final DelimitedInputFormat<String> format = new 
MyTextInputFormat();
+       private DelimitedInputFormat<String> format;
        
        // 
--------------------------------------------------------------------------------------------
 
        @Before
        public void setup() {
+               format = new MyTextInputFormat();
                this.format.setFilePath(new 
Path("file:///some/file/that/will/not/be/read"));
        }
        
        @After
-       public void setdown() throws Exception {
+       public void shutdown() throws Exception {
                if (this.format != null) {
                        this.format.close();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
index 68465a3..1076338 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.testutils.TestFileUtils;
@@ -37,11 +38,12 @@ public class EnumerateNestedFilesTest {
        protected Configuration config;
        final String tempPath = System.getProperty("java.io.tmpdir");
 
-       private final DummyFileInputFormat format = new DummyFileInputFormat();
+       private DummyFileInputFormat format;
 
        @Before
        public void setup() {
                this.config = new Configuration();
+               format = new DummyFileInputFormat();
        }
 
        @After

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
index cc040b6..4a598f2 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
@@ -22,13 +22,12 @@ package org.apache.flink.api.common.io;
 import java.io.File;
 import java.io.IOException;
 
-import org.junit.Assert;
-
 import org.apache.flink.api.common.io.FileOutputFormat.OutputDirectoryMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.types.IntValue;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.fail;
@@ -38,11 +37,8 @@ public class FileOutputFormatTest {
        @Test
        public void testCreateNonParallelLocalFS() throws IOException {
 
-               File tmpOutPath = null;
-               File tmpOutFile = null;
-
-               tmpOutPath = File.createTempFile("fileOutputFormatTest", 
"Test1");
-               tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+               File tmpOutPath = File.createTempFile("fileOutputFormatTest", 
"Test1");
+               File tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
 
                String tmpFilePath = tmpOutPath.toURI().toString();
 
@@ -652,8 +648,10 @@ public class FileOutputFormatTest {
        // 
-------------------------------------------------------------------------------------------
        
        public static class DummyFileOutputFormat extends 
FileOutputFormat<IntValue> {
+
                private static final long serialVersionUID = 1L;
                public boolean testFileName = false;
+
                @Override
                public void writeRecord(IntValue record) throws IOException {
                        // DO NOTHING

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
index ae0f8e5..c3cbb58 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
@@ -52,4 +52,5 @@ public class RichInputFormatTest {
                
assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
                
assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
index 296af11..4c303a6 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
@@ -52,4 +52,5 @@ public class RichOutputFormatTest {
                
assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
                
assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
index 3e1a723..0cf5e32 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
@@ -38,15 +37,6 @@ import static org.junit.Assert.fail;
 
 public class FilesystemSchemeConfigTest {
 
-       @Before
-       public void resetSingleton() throws SecurityException, 
NoSuchFieldException, IllegalArgumentException,
-               IllegalAccessException {
-               // reset GlobalConfiguration between tests
-               Field instance = 
GlobalConfiguration.class.getDeclaredField("SINGLETON");
-               instance.setAccessible(true);
-               instance.set(null, null);
-       }
-       
        @Test
        public void testExplicitFilesystemScheme() {
                testSettingFilesystemScheme(false, "fs.default-scheme: 
otherFS://localhost:1234/", true);
@@ -65,7 +55,12 @@ public class FilesystemSchemeConfigTest {
        private void testSettingFilesystemScheme(boolean useDefaultScheme,
                                                                                
        String configFileScheme, boolean useExplicitScheme) {
                final File tmpDir = getTmpDir();
-               final File confFile = createRandomFile(tmpDir, ".yaml");
+               final File confFile = new File(tmpDir, 
GlobalConfiguration.FLINK_CONF_FILENAME);
+               try {
+                       confFile.createNewFile();
+               } catch (IOException e) {
+                       throw new RuntimeException("Couldn't create file", e);
+               }
                final File testFile = new File(tmpDir.getAbsolutePath() + 
File.separator + "testing.txt");
 
                try {
@@ -83,8 +78,7 @@ public class FilesystemSchemeConfigTest {
                                fail(e.getMessage());
                        }
 
-                       
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
-                       Configuration conf = 
GlobalConfiguration.getConfiguration();
+                       Configuration conf = 
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
 
                        try {
                                FileSystem.setDefaultScheme(conf);

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
index ce55d2e..6336a73 100644
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
@@ -19,114 +19,58 @@
 package org.apache.flink.configuration;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.io.PrintWriter;
-import java.lang.reflect.Field;
 import java.util.UUID;
 
-import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
-import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /**
  * This class contains tests for the global configuration (parsing 
configuration directory information).
  */
 public class GlobalConfigurationTest extends TestLogger {
 
-       @Before
-       public void resetSingleton() throws SecurityException, 
NoSuchFieldException, IllegalArgumentException,
-                       IllegalAccessException {
-               // reset GlobalConfiguration between tests
-               Field instance = 
GlobalConfiguration.class.getDeclaredField("SINGLETON");
-               instance.setAccessible(true);
-               instance.set(null, null);
-       }
-       
-       @Test
-       public void testConfigurationMixed() {
-               File tmpDir = getTmpDir();
-               File confFile1 = createRandomFile(tmpDir, ".yaml");
-               File confFile2 = createRandomFile(tmpDir, ".xml");
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
 
-               try {
-                       try {
-                               PrintWriter pw1 = new PrintWriter(confFile1);
-                               PrintWriter pw2 = new PrintWriter(confFile2);
-                               
-                               pw1.println("mykey1: myvalue1_YAML");
-                               pw1.println("mykey2: myvalue2");
-                               
-                               pw2.println("<configuration>");
-                               
pw2.println("<property><key>mykey1</key><value>myvalue1_XML</value></property>");
-                               
pw2.println("<property><key>mykey3</key><value>myvalue3</value></property>");
-                               pw2.println("</configuration>");
-                               
-                               pw1.close();
-                               pw2.close();
-                       } catch (FileNotFoundException e) {
-                               e.printStackTrace();
-                       }
-                       
-                       
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
-                       Configuration conf = 
GlobalConfiguration.getConfiguration();
-                       
-                       // all distinct keys from confFile1 + confFile2key
-                       assertEquals(3, conf.keySet().size());
-                       
-                       // keys 1, 2, 3 should be OK and match the expected 
values
-                       // => configuration keys from YAML should overwrite 
keys from XML
-                       assertEquals("myvalue1_YAML", conf.getString("mykey1", 
null));
-                       assertEquals("myvalue2", conf.getString("mykey2", 
null));
-                       assertEquals("myvalue3", conf.getString("mykey3", 
null));
-               } finally {
-                       confFile1.delete();
-                       confFile2.delete();
-                       tmpDir.delete();
-               }
-       }
-       
        @Test
        public void testConfigurationYAML() {
-               File tmpDir = getTmpDir();
-               File confFile1 = createRandomFile(tmpDir, ".yaml");
-               File confFile2 = createRandomFile(tmpDir, ".yml");
+               File tmpDir = tempFolder.getRoot();
+               File confFile = new File(tmpDir, 
GlobalConfiguration.FLINK_CONF_FILENAME);
 
                try {
-                       try {
-                               PrintWriter pw1 = new PrintWriter(confFile1);
-                               PrintWriter pw2 = new PrintWriter(confFile2);
-
-                               pw1.println("###########################"); // 
should be skipped
-                               pw1.println("# Some : comments : to skip"); // 
should be skipped
-                               pw1.println("###########################"); // 
should be skipped
-                               pw1.println("mykey1: myvalue1"); // OK, simple 
correct case
-                               pw1.println("mykey2       : myvalue2"); // OK, 
whitespace before colon is correct
-                               pw1.println("mykey3:myvalue3"); // SKIP, 
missing white space after colon
-                               pw1.println(" some nonsense without colon and 
whitespace separator"); // SKIP
-                               pw1.println(" :  "); // SKIP
-                               pw1.println("   "); // SKIP
-                               pw1.println("mykey4: myvalue4# some comments"); 
// OK, skip comments only
-                               pw1.println("   mykey5    :    myvalue5    "); 
// OK, trim unnecessary whitespace
-                               pw1.println("mykey6: my: value6"); // OK, only 
use first ': ' as separator
-                               pw1.println("mykey7: "); // SKIP, no value 
provided
-                               pw1.println(": myvalue8"); // SKIP, no key 
provided
-
-                               pw2.println("mykey9: myvalue9"); // OK
-                               pw2.println("mykey9: myvalue10"); // OK, 
overwrite last value
-
-                               pw1.close();
-                               pw2.close();
+                       try (final PrintWriter pw = new PrintWriter(confFile)) {
+
+                               pw.println("###########################"); // 
should be skipped
+                               pw.println("# Some : comments : to skip"); // 
should be skipped
+                               pw.println("###########################"); // 
should be skipped
+                               pw.println("mykey1: myvalue1"); // OK, simple 
correct case
+                               pw.println("mykey2       : myvalue2"); // OK, 
whitespace before colon is correct
+                               pw.println("mykey3:myvalue3"); // SKIP, missing 
white space after colon
+                               pw.println(" some nonsense without colon and 
whitespace separator"); // SKIP
+                               pw.println(" :  "); // SKIP
+                               pw.println("   "); // SKIP
+                               pw.println("mykey4: myvalue4# some comments"); 
// OK, skip comments only
+                               pw.println("   mykey5    :    myvalue5    "); 
// OK, trim unnecessary whitespace
+                               pw.println("mykey6: my: value6"); // OK, only 
use first ': ' as separator
+                               pw.println("mykey7: "); // SKIP, no value 
provided
+                               pw.println(": myvalue8"); // SKIP, no key 
provided
+
+                               pw.println("mykey9: myvalue9"); // OK
+                               pw.println("mykey9: myvalue10"); // OK, 
overwrite last value
+
                        } catch (FileNotFoundException e) {
                                e.printStackTrace();
                        }
 
-                       
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
-                       Configuration conf = 
GlobalConfiguration.getConfiguration();
+                       Configuration conf = 
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
 
                        // all distinct keys from confFile1 + confFile2 key
                        assertEquals(6, conf.keySet().size());
@@ -142,83 +86,36 @@ public class GlobalConfigurationTest extends TestLogger {
                        assertEquals("null", conf.getString("mykey8", "null"));
                        assertEquals("myvalue10", conf.getString("mykey9", 
null));
                } finally {
-                       confFile1.delete();
-                       confFile2.delete();
+                       confFile.delete();
                        tmpDir.delete();
                }
        }
 
-       /**
-        * This test creates several configuration files with values and 
cross-checks the resulting
-        * {@link GlobalConfiguration} object.
-        */
-       @Test
-       public void testConfigurationXML() {
-
-               // Create temporary directory for configuration files
-               final File tmpDir = getTmpDir();
-               final File confFile1 = createRandomFile(tmpDir, ".xml");
-               final File confFile2 = createRandomFile(tmpDir, ".xml");
-
-               try {
-                       try {
-                               final PrintWriter pw1 = new 
PrintWriter(confFile1);
-                               final PrintWriter pw2 = new 
PrintWriter(confFile2);
-
-                               pw1.append("<configuration>");
-                               pw2.append("<configuration>");
-
-                               
pw1.append("<property><key>mykey1</key><value>myvalue1</value></property>");
-                               pw1.append("<property></property>");
-                               
pw1.append("<property><key></key><value></value></property>");
-                               
pw1.append("<property><key>hello</key><value></value></property>");
-                               
pw1.append("<property><key>mykey2</key><value>myvalue2</value></property>");
-                               
pw2.append("<property><key>mykey3</key><value>myvalue3</value></property>");
-                               
pw2.append("<property><key>mykey4</key><value>myvalue4</value></property>");
-
-                               pw1.append("</configuration>");
-                               pw2.append("</configuration>");
-                               pw1.close();
-                               pw2.close();
-                       } catch (FileNotFoundException e) {
-                               fail(e.getMessage());
-                       }
-
-                       
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
+       @Test(expected = IllegalArgumentException.class)
+       public void testFailIfNull() {
+               GlobalConfiguration.loadConfiguration(null);
+       }
 
-                       final Configuration co = 
GlobalConfiguration.getConfiguration();
+       @Test(expected = IllegalConfigurationException.class)
+       public void testFailIfNotLoaded() {
+               GlobalConfiguration.loadConfiguration("/some/path/" + 
UUID.randomUUID());
+       }
 
-                       assertEquals(co.getString("mykey1", "null"), 
"myvalue1");
-                       assertEquals(co.getString("mykey2", "null"), 
"myvalue2");
-                       assertEquals(co.getString("mykey3", "null"), 
"myvalue3");
-                       assertEquals(co.getString("mykey4", "null"), 
"myvalue4");
+       @Test(expected = IllegalConfigurationException.class)
+       public void testInvalidConfiguration() throws IOException {
+               
GlobalConfiguration.loadConfiguration(tempFolder.getRoot().getAbsolutePath());
+       }
 
-                       // // Test (wrong) string-to integer conversion. should 
return default value.
-                       // semantics are changed to throw an exception upon 
invalid parsing!
-                       // assertEquals(co.getInteger("mykey1", 500), 500);
-                       // assertEquals(co.getInteger("anything", 500), 500);
-                       // assertEquals(co.getBoolean("notexistent", true), 
true);
+       @Test
+       // We allow malformed YAML files
+       public void testInvalidYamlFile() throws IOException {
+               final File confFile = 
tempFolder.newFile(GlobalConfiguration.FLINK_CONF_FILENAME);
 
-                       // Test include local configuration
-                       final Configuration newconf = new Configuration();
-                       newconf.setInteger("mynewinteger", 1000);
-                       GlobalConfiguration.includeConfiguration(newconf);
-                       
assertEquals(GlobalConfiguration.getInteger("mynewinteger", 0), 1000);
-               } finally {
-                       // Remove temporary files
-                       confFile1.delete();
-                       confFile2.delete();
-                       tmpDir.delete();
+               try (PrintWriter pw = new PrintWriter(confFile);) {
+                       pw.append("invalid");
                }
-       }
 
-       private File getTmpDir() {
-               File tmpDir = new File(CommonTestUtils.getTempDir(), 
UUID.randomUUID().toString());
-               assertTrue(tmpDir.mkdirs());
-               return tmpDir;
+               
assertNotNull(GlobalConfiguration.loadConfiguration(tempFolder.getRoot().getAbsolutePath()));
        }
 
-       private File createRandomFile(File path, String suffix) {
-               return new File(path, UUID.randomUUID().toString() + suffix);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java 
b/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java
index 6096f69..d34f20a 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 
 /**
@@ -30,20 +31,20 @@ import org.apache.flink.configuration.GlobalConfiguration;
  */
 public final class TestConfigUtils {
        
-       public static void loadGlobalConf(String[] keys, String[] values) 
throws IOException {
-               loadGlobalConf(getConfAsString(keys, values));
+       public static Configuration loadGlobalConf(String[] keys, String[] 
values) throws IOException {
+               return loadGlobalConf(getConfAsString(keys, values));
        }
        
-       public static void loadGlobalConf(String contents) throws IOException {
+       public static Configuration loadGlobalConf(String contents) throws 
IOException {
                final File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
-               File confDir = null;
+               File confDir;
                do {
                        confDir = new File(tempDir, 
TestFileUtils.randomFileName());
                } while (confDir.exists());
                
                try {
                        confDir.mkdirs();
-                       final File confFile = new File(confDir, 
"tempConfig.xml");
+                       final File confFile = new File(confDir, 
GlobalConfiguration.FLINK_CONF_FILENAME);
                
                        try {
                                BufferedWriter writer = new BufferedWriter(new 
FileWriter(confFile));
@@ -52,7 +53,7 @@ public final class TestConfigUtils {
                                } finally {
                                        writer.close();
                                }
-                               
GlobalConfiguration.loadConfiguration(confDir.getAbsolutePath());
+                               return 
GlobalConfiguration.loadConfiguration(confDir.getAbsolutePath());
                        } finally {
                                confFile.delete();
                        }
@@ -61,25 +62,25 @@ public final class TestConfigUtils {
                        confDir.delete();
                }
        }
-       
+
        public static String getConfAsString(String[] keys, String[] values) {
                if (keys == null || values == null || keys.length != 
values.length) {
                        throw new IllegalArgumentException();
                }
-               
+
                StringBuilder bld = new StringBuilder();
-               bld.append("<?xml version=\"1.0\" 
encoding=\"UTF-8\"?>\n<configuration>\n");
-               
+
                for (int i = 0; i < keys.length; i++) {
-                       
bld.append("<property>\n<key>").append(keys[i]).append("</key>\n");
-                       
bld.append("<value>").append(values[i]).append("</value>\n</property>\n");
+                       bld.append(keys[i]);
+                       bld.append(": ");
+                       bld.append(values[i]);
+                       bld.append(System.lineSeparator());
                }
-               bld.append("</configuration>\n");
                return bld.toString();
        }
 
        // 
------------------------------------------------------------------------
-       
+
        private TestConfigUtils() {}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index ab4e993..7c41eaf 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -104,13 +104,17 @@ public final class HadoopUtils {
         * This method is public because its being used in the HadoopDataSource.
         */
        public static org.apache.hadoop.conf.Configuration 
getHadoopConfiguration() {
+
+               org.apache.flink.configuration.Configuration flinkConfiguration 
=
+                       GlobalConfiguration.loadConfiguration();
+
                Configuration retConf = new 
org.apache.hadoop.conf.Configuration();
 
                // We need to load both core-site.xml and hdfs-site.xml to 
determine the default fs path and
                // the hdfs configuration
                // Try to load HDFS configuration from Hadoop's own 
configuration files
                // 1. approach: Flink configuration
-               final String hdfsDefaultPath = 
GlobalConfiguration.getString(ConfigConstants
+               final String hdfsDefaultPath = 
flinkConfiguration.getString(ConfigConstants
                                .HDFS_DEFAULT_CONFIG, null);
                if (hdfsDefaultPath != null) {
                        retConf.addResource(new 
org.apache.hadoop.fs.Path(hdfsDefaultPath));
@@ -118,7 +122,7 @@ public final class HadoopUtils {
                        LOG.debug("Cannot find hdfs-default configuration 
file");
                }
 
-               final String hdfsSitePath = 
GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+               final String hdfsSitePath = 
flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
                if (hdfsSitePath != null) {
                        retConf.addResource(new 
org.apache.hadoop.fs.Path(hdfsSitePath));
                } else {
@@ -127,7 +131,7 @@ public final class HadoopUtils {
 
                // 2. Approach environment variables
                String[] possibleHadoopConfPaths = new String[4];
-               possibleHadoopConfPaths[0] = 
GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+               possibleHadoopConfPaths[0] = 
flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
                possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
 
                if (System.getenv("HADOOP_HOME") != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
index b219de4..52fd734 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
@@ -37,12 +37,14 @@ public final class HadoopUtils {
        /**
         * Merge HadoopConfiguration into Configuration. This is necessary for 
the HDFS configuration.
         */
-       public static void mergeHadoopConf(Configuration configuration) {
-               Configuration hadoopConf = 
org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration();
-               
+       public static void mergeHadoopConf(Configuration hadoopConfig) {
+
+               Configuration hadoopConf =
+                       
org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration();
+
                for (Map.Entry<String, String> e : hadoopConf) {
-                       if (configuration.get(e.getKey()) == null) {
-                               configuration.set(e.getKey(), e.getValue());
+                       if (hadoopConfig.get(e.getKey()) == null) {
+                               hadoopConfig.set(e.getKey(), e.getValue());
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index 75b82cd..05ed6fa 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -46,12 +46,12 @@ public class PrimitiveInputFormat<OT> extends 
DelimitedInputFormat<OT> {
 
 
        public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) {
-               super(filePath);
+               super(filePath, null);
                this.primitiveClass = primitiveClass;
        }
 
        public PrimitiveInputFormat(Path filePath, String delimiter, Class<OT> 
primitiveClass) {
-               super(filePath);
+               super(filePath, null);
                this.primitiveClass = primitiveClass;
                this.setDelimiter(delimiter);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
index d6a02f1..b2554bf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
@@ -50,7 +50,7 @@ public class TextInputFormat extends 
DelimitedInputFormat<String> {
        // 
--------------------------------------------------------------------------------------------
        
        public TextInputFormat(Path filePath) {
-               super(filePath);
+               super(filePath, null);
        }
        
        // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
index a0d20d6..45a2e3e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
@@ -50,7 +50,7 @@ public class TextValueInputFormat extends 
DelimitedInputFormat<StringValue> {
        // 
--------------------------------------------------------------------------------------------
        
        public TextValueInputFormat(Path filePath) {
-               super(filePath);
+               super(filePath, null);
        }
        
        // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index a6cbfa8..d55b9d4 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -73,8 +73,10 @@ public class PythonPlanBinder {
        public static final String FLINK_PYTHON3_BINARY_KEY = 
"python.binary.python3";
        public static final String PLANBINDER_CONFIG_BCVAR_COUNT = 
"PLANBINDER_BCVAR_COUNT";
        public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = 
"PLANBINDER_BCVAR_";
-       public static String FLINK_PYTHON2_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
-       public static String FLINK_PYTHON3_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+       public static String FLINK_PYTHON2_BINARY_PATH =
+               
GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON2_BINARY_KEY, 
"python");
+       public static String FLINK_PYTHON3_BINARY_PATH =
+               
GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON3_BINARY_KEY, 
"python3");
 
        private static final Random r = new Random();
 
@@ -113,8 +115,9 @@ public class PythonPlanBinder {
        }
 
        public PythonPlanBinder() throws IOException {
-               FLINK_PYTHON2_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
-               FLINK_PYTHON3_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+               Configuration conf = GlobalConfiguration.loadConfiguration();
+               FLINK_PYTHON2_BINARY_PATH = 
conf.getString(FLINK_PYTHON2_BINARY_KEY, "python");
+               FLINK_PYTHON3_BINARY_PATH = 
conf.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
                FULL_PATH = FLINK_DIR != null
                                //command-line
                                ? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 12c5dfc..5ab1fbf 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -49,7 +50,6 @@ import 
org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plan.WorksetPlanNode;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.optimizer.util.Utils;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -107,10 +107,11 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
        
        public static final String MERGE_ITERATION_AUX_TASKS_KEY = 
"compiler.merge-iteration-aux";
        
-       private static final boolean mergeIterationAuxTasks = 
GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
-       
+       private static final boolean mergeIterationAuxTasks =
+               
GlobalConfiguration.loadConfiguration().getBoolean(MERGE_ITERATION_AUX_TASKS_KEY,
 false);
+
        private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new 
TaskInChain(null, null, null, null);
-       
+
        // 
------------------------------------------------------------------------
 
        private Map<PlanNode, JobVertex> vertices; // a map from optimizer 
nodes to job vertices
@@ -156,7 +157,6 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                this.useLargeRecordHandler = config.getBoolean(
                                ConfigConstants.USE_LARGE_RECORD_HANDLER_KEY,
                                
ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER);
-
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 4e05ebe..5d7173b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -176,20 +176,24 @@ public final class HadoopFileSystem extends FileSystem 
implements HadoopFileSyst
         * This method is public because its being used in the HadoopDataSource.
         */
        public static org.apache.hadoop.conf.Configuration 
getHadoopConfiguration() {
+
+               org.apache.flink.configuration.Configuration flinkConfiguration 
=
+                       GlobalConfiguration.loadConfiguration();
+
                Configuration retConf = new 
org.apache.hadoop.conf.Configuration();
 
                // We need to load both core-site.xml and hdfs-site.xml to 
determine the default fs path and
                // the hdfs configuration
                // Try to load HDFS configuration from Hadoop's own 
configuration files
                // 1. approach: Flink configuration
-               final String hdfsDefaultPath = 
GlobalConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
+               final String hdfsDefaultPath = 
flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
                if (hdfsDefaultPath != null) {
                        retConf.addResource(new 
org.apache.hadoop.fs.Path(hdfsDefaultPath));
                } else {
                        LOG.debug("Cannot find hdfs-default configuration 
file");
                }
 
-               final String hdfsSitePath = 
GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+               final String hdfsSitePath = 
flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
                if (hdfsSitePath != null) {
                        retConf.addResource(new 
org.apache.hadoop.fs.Path(hdfsSitePath));
                } else {
@@ -198,7 +202,7 @@ public final class HadoopFileSystem extends FileSystem 
implements HadoopFileSyst
                
                // 2. Approach environment variables
                String[] possibleHadoopConfPaths = new String[4]; 
-               possibleHadoopConfPaths[0] = 
GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+               possibleHadoopConfPaths[0] = 
flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
                possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
                
                if (System.getenv("HADOOP_HOME") != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f14a37f..84d38c1 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2350,8 +2350,7 @@ object JobManager {
     }
 
     LOG.info("Loading configuration from " + configDir)
-    GlobalConfiguration.loadConfiguration(configDir)
-    val configuration = GlobalConfiguration.getConfiguration()
+    val configuration = GlobalConfiguration.loadConfiguration(configDir)
 
     try {
       FileSystem.setDefaultScheme(configuration)

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a7dd789..226fa75 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1526,7 +1526,6 @@ object TaskManager {
     val conf: Configuration = try {
       LOG.info("Loading configuration from " + cliConfig.getConfigDir())
       GlobalConfiguration.loadConfiguration(cliConfig.getConfigDir())
-      GlobalConfiguration.getConfiguration()
     }
     catch {
       case e: Exception => throw new Exception("Could not load configuration", 
e)

Reply via email to