[FLINK-3904] enhancements to GlobalConfiguration - fail if config couldn't be loaded - remove duplicate api methods - remove undocumented XML loading feature - generate yaml conf in tests instead of xml conf - only load one config file instead of all xml or yaml files (flink-conf.yaml) - make globalconfiguration non-global and remove static SINGLETON - fix test cases - add test cases
This closes #2123 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5eb0e38f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5eb0e38f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5eb0e38f Branch: refs/heads/master Commit: 5eb0e38fbb346fea05932c3afc24a2d071e8974f Parents: 12bf7c1 Author: Maximilian Michels <[email protected]> Authored: Wed Jul 27 17:06:06 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Wed Jul 27 16:40:09 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 4 +- .../CliFrontendAddressConfigurationTest.java | 5 - .../flink/client/CliFrontendListCancelTest.java | 1 - .../apache/flink/client/CliFrontendRunTest.java | 1 - .../flink/client/CliFrontendStopTest.java | 2 - .../flink/client/CliFrontendTestUtils.java | 28 +- .../org/apache/flink/storm/api/FlinkClient.java | 8 +- .../apache/flink/storm/api/FlinkSubmitter.java | 2 +- .../flink/api/common/io/BinaryOutputFormat.java | 2 +- .../api/common/io/DelimitedInputFormat.java | 30 +- .../flink/api/common/io/FileInputFormat.java | 25 +- .../flink/api/common/io/FileOutputFormat.java | 34 +- .../api/common/io/GenericCsvInputFormat.java | 8 +- .../apache/flink/api/common/io/InputFormat.java | 2 +- .../api/common/io/SerializedOutputFormat.java | 2 +- .../configuration/GlobalConfiguration.java | 395 +++---------------- .../api/common/io/BinaryInputFormatTest.java | 3 +- .../io/DelimitedInputFormatSamplingTest.java | 44 ++- .../api/common/io/DelimitedInputFormatTest.java | 6 +- .../api/common/io/EnumerateNestedFilesTest.java | 4 +- .../api/common/io/FileOutputFormatTest.java | 12 +- .../api/common/io/RichInputFormatTest.java | 1 + .../api/common/io/RichOutputFormatTest.java | 1 + .../FilesystemSchemeConfigTest.java | 20 +- .../configuration/GlobalConfigurationTest.java | 201 +++------- .../apache/flink/testutils/TestConfigUtils.java | 29 +- .../java/hadoop/mapred/utils/HadoopUtils.java | 10 +- .../hadoop/mapreduce/utils/HadoopUtils.java | 12 +- .../flink/api/java/io/PrimitiveInputFormat.java | 4 +- .../flink/api/java/io/TextInputFormat.java | 2 +- .../flink/api/java/io/TextValueInputFormat.java | 2 +- .../flink/python/api/PythonPlanBinder.java | 11 +- .../plantranslate/JobGraphGenerator.java | 10 +- .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 10 +- .../flink/runtime/jobmanager/JobManager.scala | 3 +- .../flink/runtime/taskmanager/TaskManager.scala | 1 - .../BlobLibraryCacheManagerTest.java | 4 - .../org/apache/flink/api/scala/FlinkShell.scala | 4 +- .../flink/api/scala/ScalaShellITCase.scala | 4 +- .../environment/StreamContextEnvironment.java | 2 +- .../api/environment/StreamPlanEnvironment.java | 2 +- ...CliFrontendYarnAddressConfigurationTest.java | 8 - .../flink/yarn/YARNHighAvailabilityITCase.java | 2 +- .../flink/yarn/YARNSessionFIFOITCase.java | 2 +- .../yarn/AbstractYarnClusterDescriptor.java | 7 +- .../flink/yarn/YarnApplicationMasterRunner.java | 3 +- 46 files changed, 273 insertions(+), 700 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index a888841..7c2ee2e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -153,9 +153,7 @@ public class CliFrontend { // load the configuration LOG.info("Trying to load configuration file"); - GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); - System.setProperty(ConfigConstants.ENV_FLINK_CONF_DIR, configDirectory.getAbsolutePath()); - this.config = GlobalConfiguration.getConfiguration(); + this.config = GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); try { FileSystem.setDefaultScheme(config); http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java index 41d8622..8320e04 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java @@ -48,11 +48,6 @@ public class CliFrontendAddressConfigurationTest { CliFrontendTestUtils.pipeSystemOutToNull(); } - @Before - public void clearConfig() { - CliFrontendTestUtils.clearGlobalConfiguration(); - } - @Test public void testValidConfig() { try { http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java index 736d859..524e7e7 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java @@ -55,7 +55,6 @@ public class CliFrontendListCancelTest { @BeforeClass public static void init() { CliFrontendTestUtils.pipeSystemOutToNull(); - CliFrontendTestUtils.clearGlobalConfiguration(); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java index f710d8e..0326eab 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java @@ -36,7 +36,6 @@ public class CliFrontendRunTest { @BeforeClass public static void init() { CliFrontendTestUtils.pipeSystemOutToNull(); - CliFrontendTestUtils.clearGlobalConfiguration(); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java index 7c34c75..9522ac7 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java @@ -35,7 +35,6 @@ import org.junit.Test; import java.util.UUID; import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull; -import static org.apache.flink.client.CliFrontendTestUtils.clearGlobalConfiguration; import static org.junit.Assert.*; public class CliFrontendStopTest extends TestLogger { @@ -45,7 +44,6 @@ public class CliFrontendStopTest extends TestLogger { @BeforeClass public static void setup() { pipeSystemOutToNull(); - clearGlobalConfiguration(); actorSystem = ActorSystem.create("TestingActorSystem"); } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java index 1872133..c411a7b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java @@ -71,33 +71,7 @@ public class CliFrontendTestUtils { System.setOut(new PrintStream(new BlackholeOutputSteam())); System.setErr(new PrintStream(new BlackholeOutputSteam())); } - - public static void clearGlobalConfiguration() { - try { - Field singletonInstanceField = GlobalConfiguration.class.getDeclaredField("SINGLETON"); - Field conf = GlobalConfiguration.class.getDeclaredField("config"); - Field map = Configuration.class.getDeclaredField("confData"); - - singletonInstanceField.setAccessible(true); - conf.setAccessible(true); - map.setAccessible(true); - - GlobalConfiguration gconf = (GlobalConfiguration) singletonInstanceField.get(null); - if (gconf != null) { - Configuration confObject = (Configuration) conf.get(gconf); - @SuppressWarnings("unchecked") - Map<String, Object> confData = (Map<String, Object>) map.get(confObject); - confData.clear(); - } - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test initialization caused an exception: " + e.getMessage()); - } - - } - + private static final class BlackholeOutputSteam extends java.io.OutputStream { @Override public void write(int b){} http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java index 6ad250d..9628bb7 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java @@ -242,7 +242,7 @@ public class FlinkClient { } } - final Configuration configuration = GlobalConfiguration.getConfiguration(); + final Configuration configuration = GlobalConfiguration.loadConfiguration(); configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerHost); configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerPort); @@ -271,7 +271,7 @@ public class FlinkClient { * @return Flink's internally used {@link JobID}. */ JobID getTopologyJobId(final String id) { - final Configuration configuration = GlobalConfiguration.getConfiguration(); + final Configuration configuration = GlobalConfiguration.loadConfiguration(); if (this.timeout != null) { configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout); } @@ -311,7 +311,7 @@ public class FlinkClient { } private FiniteDuration getTimeout() { - final Configuration configuration = GlobalConfiguration.getConfiguration(); + final Configuration configuration = GlobalConfiguration.loadConfiguration(); if (this.timeout != null) { configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout); } @@ -320,7 +320,7 @@ public class FlinkClient { } private ActorRef getJobManager() throws IOException { - final Configuration configuration = GlobalConfiguration.getConfiguration(); + final Configuration configuration = GlobalConfiguration.loadConfiguration(); ActorSystem actorSystem; try { http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java index 13a39ef..f8932b1 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java @@ -87,7 +87,7 @@ public class FlinkSubmitter { throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); } - final Configuration flinkConfig = GlobalConfiguration.getConfiguration(); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(); if (!stormConf.containsKey(Config.NIMBUS_HOST)) { stormConf.put(Config.NIMBUS_HOST, flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")); http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java index a89e73e..059198c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java @@ -45,7 +45,7 @@ public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> { private transient DataOutputViewStreamWrapper outView; - + @Override public void close() throws IOException { try { http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index 99aa022..fd02c82 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -20,13 +20,13 @@ package org.apache.flink.api.common.io; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; @@ -84,12 +84,18 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple */ private static int MAX_SAMPLE_LEN; - static { loadGlobalConfigParams(); } - + /** + * @Deprecated Please use {@code loadConfigParameters(Configuration config} + */ + @Deprecated protected static void loadGlobalConfigParams() { - int maxSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY, + loadConfigParameters(GlobalConfiguration.loadConfiguration()); + } + + protected static void loadConfigParameters(Configuration parameters) { + int maxSamples = parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY, ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_LINE_SAMPLES); - int minSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY, + int minSamples = parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY, ConfigConstants.DEFAULT_DELIMITED_FORMAT_MIN_LINE_SAMPLES); if (maxSamples < 0) { @@ -113,7 +119,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple DEFAULT_MIN_NUM_SAMPLES = minSamples; } - int maxLen = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_SAMPLE_LENGTH_KEY, + int maxLen = parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_SAMPLE_LENGTH_KEY, ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN); if (maxLen <= 0) { maxLen = ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN; @@ -164,13 +170,17 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple // -------------------------------------------------------------------------------------------- // Constructors & Getters/setters for the configurable parameters // -------------------------------------------------------------------------------------------- - + public DelimitedInputFormat() { - super(); + this(null, null); } - - protected DelimitedInputFormat(Path filePath) { + + protected DelimitedInputFormat(Path filePath, Configuration configuration) { super(filePath); + if (configuration == null) { + configuration = GlobalConfiguration.loadConfiguration(); + } + loadConfigParameters(configuration); } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 95a1ffa..72d6061 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -87,15 +87,19 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS * The splitLength is set to -1L for reading the whole split. */ protected static final long READ_WHOLE_SPLIT_FLAG = -1L; - + static { - initDefaultsFromConfiguration(); + initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration()); initDefaultInflaterInputStreamFactories(); } - - private static void initDefaultsFromConfiguration() { - - final long to = GlobalConfiguration.getLong(ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY, + + /** + * Initialize defaults for input format. Needs to be a static method because it is configured for local + * cluster execution, see LocalFlinkMiniCluster. + * @param configuration The configuration to load defaults from + */ + private static void initDefaultsFromConfiguration(Configuration configuration) { + final long to = configuration.getLong(ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY, ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT); if (to < 0) { LOG.error("Invalid timeout value for filesystem stream opening: " + to + ". Using default value of " + @@ -154,10 +158,6 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS } } - static long getDefaultOpeningTimeout() { - return DEFAULT_OPENING_TIMEOUT; - } - // -------------------------------------------------------------------------------------------- // Variables for internal operation. // They are all transient, because we do not want them so be serialized @@ -224,11 +224,8 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS // -------------------------------------------------------------------------------------------- public FileInputFormat() {} - + protected FileInputFormat(Path filePath) { - if (filePath == null) { - throw new IllegalArgumentException("The file path must not be null."); - } this.filePath = filePath; } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java index 557c342..7530ba1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java @@ -22,11 +22,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import org.apache.flink.annotation.Public; +import org.apache.flink.configuration.GlobalConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -62,26 +62,30 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen private static WriteMode DEFAULT_WRITE_MODE; private static OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE; - - - private static final void initDefaultsFromConfiguration(Configuration configuration) { - final boolean overwrite = configuration.getBoolean(ConfigConstants - .FILESYSTEM_DEFAULT_OVERWRITE_KEY, + + static { + initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration()); + } + + /** + * Initialize defaults for output format. Needs to be a static method because it is configured for local + * cluster execution, see LocalFlinkMiniCluster. + * @param configuration The configuration to load defaults from + */ + private static void initDefaultsFromConfiguration(Configuration configuration) { + final boolean overwrite = configuration.getBoolean( + ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE); DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE; - final boolean alwaysCreateDirectory = configuration.getBoolean(ConfigConstants - .FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY, + final boolean alwaysCreateDirectory = configuration.getBoolean( + ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY, ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY); DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY; } - - static { - initDefaultsFromConfiguration(GlobalConfiguration.getConfiguration()); - } - + // -------------------------------------------------------------------------------------------- /** @@ -121,9 +125,9 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen private transient boolean fileCreated; // -------------------------------------------------------------------------------------------- - + public FileOutputFormat() {} - + public FileOutputFormat(Path outputPath) { this.outputFilePath = outputPath; } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index e2c54ad..85d9cd8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -98,15 +98,15 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> // -------------------------------------------------------------------------------------------- // Constructors and getters/setters for the configurable parameters // -------------------------------------------------------------------------------------------- - + protected GenericCsvInputFormat() { super(); } - + protected GenericCsvInputFormat(Path filePath) { - super(filePath); + super(filePath, null); } - + // -------------------------------------------------------------------------------------------- public int getNumberOfFieldsTotal() { http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java index 0e978b9..300c237 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java @@ -70,7 +70,7 @@ public interface InputFormat<OT, T extends InputSplit> extends InputSplitSource< * <p> * This method is always called first on a newly instantiated input format. * - * @param parameters The configuration with all parameters. + * @param parameters The configuration with all parameters (note: not the Flink config but the TaskConfig). */ void configure(Configuration parameters); http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java index 0ef160e..edbe1a0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java @@ -33,7 +33,7 @@ import org.apache.flink.core.memory.DataOutputView; public class SerializedOutputFormat<T extends IOReadableWritable> extends BinaryOutputFormat<T> { private static final long serialVersionUID = 1L; - + @Override protected void serialize(T record, DataOutputView dataOutputView) throws IOException { record.write(dataOutputView); http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java index 7e50486..8d550d7 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java @@ -21,21 +21,12 @@ package org.apache.flink.configuration; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; -import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStreamReader; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; - import org.apache.flink.annotation.Internal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; -import org.w3c.dom.Text; /** * Global configuration object for Flink. Similar to Java properties configuration @@ -44,159 +35,62 @@ import org.w3c.dom.Text; @Internal public final class GlobalConfiguration { - /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class); - /** The global configuration object accessible through a singleton pattern. */ - private static GlobalConfiguration SINGLETON = null; - - /** The internal map holding the key-value pairs the configuration consists of. */ - private final Configuration config = new Configuration(); + public static final String FLINK_CONF_FILENAME = "flink-conf.yaml"; // -------------------------------------------------------------------------------------------- - - /** - * Retrieves the singleton object of the global configuration. - * - * @return the global configuration object - */ - private static GlobalConfiguration get() { - // lazy initialization currently only for testibility - synchronized (GlobalConfiguration.class) { - if (SINGLETON == null) { - SINGLETON = new GlobalConfiguration(); - } - return SINGLETON; - } - } - /** - * The constructor used to construct the singleton instance of the global configuration. - */ private GlobalConfiguration() {} // -------------------------------------------------------------------------------------------- - - /** - * Returns the value associated with the given key as a string. - * - * @param key - * the key pointing to the associated value - * @param defaultValue - * the default value which is returned in case there is no value associated with the given key - * @return the (default) value associated with the given key - */ - public static String getString(String key, String defaultValue) { - return get().config.getString(key, defaultValue); - } - - /** - * Returns the value associated with the given key as a long integer. - * - * @param key - * the key pointing to the associated value - * @param defaultValue - * the default value which is returned in case there is no value associated with the given key - * @return the (default) value associated with the given key - */ - public static long getLong(String key, long defaultValue) { - return get().config.getLong(key, defaultValue); - } /** - * Returns the value associated with the given key as an integer. - * - * @param key - * the key pointing to the associated value - * @param defaultValue - * the default value which is returned in case there is no value associated with the given key - * @return the (default) value associated with the given key - */ - public static int getInteger(String key, int defaultValue) { - return get().config.getInteger(key, defaultValue); - } - - /** - * Returns the value associated with the given key as a float. - * - * @param key - * the key pointing to the associated value - * @param defaultValue - * the default value which is returned in case there is no value associated with the given key - * @return the (default) value associated with the given key + * Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an + * empty configuration object if the environment variable is not set. In production this variable is set but + * tests and local execution/debugging don't have this environment variable set. That's why we should fail + * if it is not set. + * @return Returns the Configuration */ - public static float getFloat(String key, float defaultValue) { - return get().config.getFloat(key, defaultValue); - } - - /** - * Returns the value associated with the given key as a boolean. - * - * @param key - * the key pointing to the associated value - * @param defaultValue - * the default value which is returned in case there is no value associated with the given key - * @return the (default) value associated with the given key - */ - public static boolean getBoolean(String key, boolean defaultValue) { - return get().config.getBoolean(key, defaultValue); + public static Configuration loadConfiguration() { + final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR); + if (configDir == null) { + return new Configuration(); + } + return loadConfiguration(configDir); } /** * Loads the configuration files from the specified directory. * <p> - * XML and YAML are supported as configuration files. If both XML and YAML files exist in the configuration - * directory, keys from YAML will overwrite keys from XML. + * YAML files are supported as configuration files. * * @param configDir * the directory which contains the configuration files */ - public static void loadConfiguration(final String configDir) { + public static Configuration loadConfiguration(final String configDir) { if (configDir == null) { - LOG.warn("Given configuration directory is null, cannot load configuration"); - return; + throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration"); } final File confDirFile = new File(configDir); if (!(confDirFile.exists())) { - LOG.warn("The given configuration directory name '" + configDir + "' (" + confDirFile.getAbsolutePath() - + ") does not describe an existing directory."); - return; + throw new IllegalConfigurationException( + "The given configuration directory name '" + configDir + + "' (" + confDirFile.getAbsolutePath() + ") does not describe an existing directory."); } - - if (confDirFile.isFile()) { - final File file = new File(configDir); - if(configDir.endsWith(".xml")) { - get().loadXMLResource( file ); - } else if(configDir.endsWith(".yaml")) { - get().loadYAMLResource(file); - } else { - LOG.warn("The given configuration has an unknown extension."); - return; - } - return; - } - - // get all XML and YAML files in the directory - final File[] xmlFiles = filterFilesBySuffix(confDirFile, ".xml"); - final File[] yamlFiles = filterFilesBySuffix(confDirFile, new String[] { ".yaml", ".yml" }); - if ((xmlFiles == null || xmlFiles.length == 0) && (yamlFiles == null || yamlFiles.length == 0)) { - LOG.warn("Unable to get the contents of the config directory '" + configDir + "' (" - + confDirFile.getAbsolutePath() + ")."); - return; - } + // get Flink yaml configuration file + final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME); - // load config files and write into config map - for (File f : xmlFiles) { - get().loadXMLResource(f); + if (!yamlConfigFile.exists()) { + throw new IllegalConfigurationException( + "The Flink config file '" + yamlConfigFile + + "' (" + confDirFile.getAbsolutePath() + ") does not exist."); } - // => if both XML and YAML files exist, the YAML config keys overwrite XML settings - for (File f : yamlFiles) { - get().loadYAMLResource(f); - } + return loadYAMLResource(yamlConfigFile); } /** @@ -219,237 +113,46 @@ public final class GlobalConfiguration { * @param file the YAML file to read from * @see <a href="http://www.yaml.org/spec/1.2/spec.html">YAML 1.2 specification</a> */ - private void loadYAMLResource(File file) { - - synchronized (getClass()) { - - BufferedReader reader = null; - try { - reader = new BufferedReader(new InputStreamReader(new FileInputStream(file))); - - String line = null; - while ((line = reader.readLine()) != null) { - - // 1. check for comments - String[] comments = line.split("#", 2); - String conf = comments[0]; - - // 2. get key and value - if (conf.length() > 0) { - String[] kv = conf.split(": ", 2); - - // skip line with no valid key-value pair - if (kv.length == 1) { - LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line); - continue; - } - - String key = kv[0].trim(); - String value = kv[1].trim(); - - // sanity check - if (key.length() == 0 || value.length() == 0) { - LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line); - continue; - } - - LOG.debug("Loading configuration property: {}, {}", key, value); - - this.config.setString(key, value); - } - } - } - catch (IOException e) { - LOG.error("Error parsing YAML configuration.", e); - } - finally { - try { - if(reader != null) { - reader.close(); - } - } catch (IOException e) { - LOG.warn("Cannot to close reader with IOException.", e); - } - } - } - } - - /** - * Loads an XML document of key-values pairs. - * - * @param file - * the XML document file - */ - private void loadXMLResource(File file) { + private static Configuration loadYAMLResource(File file) { + final Configuration config = new Configuration(); - final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); - // Ignore comments in the XML file - docBuilderFactory.setIgnoringComments(true); - docBuilderFactory.setNamespaceAware(true); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))){ - try { + String line; + while ((line = reader.readLine()) != null) { - final DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); - Document doc; - Element root; + // 1. check for comments + String[] comments = line.split("#", 2); + String conf = comments[0]; - doc = builder.parse(file); - - if (doc == null) { - LOG.warn("Cannot load configuration: doc is null"); - return; - } + // 2. get key and value + if (conf.length() > 0) { + String[] kv = conf.split(": ", 2); - root = doc.getDocumentElement(); - if (root == null) { - LOG.warn("Cannot load configuration: root is null"); - return; - } - - if (!"configuration".equals(root.getNodeName())) { - return; - } - - final NodeList props = root.getChildNodes(); - int propNumber = -1; - - synchronized (getClass()) { - - for (int i = 0; i < props.getLength(); i++) { - - final Node propNode = props.item(i); - String key = null; - String value = null; - - // Ignore text at this point - if (propNode instanceof Text) { + // skip line with no valid key-value pair + if (kv.length == 1) { + LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line); continue; } - if (!(propNode instanceof Element)) { - continue; - } - - Element property = (Element) propNode; - if (!"property".equals(property.getNodeName())) { - continue; - } + String key = kv[0].trim(); + String value = kv[1].trim(); - propNumber++; - final NodeList propChildren = property.getChildNodes(); - if (propChildren == null) { - LOG.warn("Error while reading configuration: property has no children, skipping..."); + // sanity check + if (key.length() == 0 || value.length() == 0) { + LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line); continue; } - for (int j = 0; j < propChildren.getLength(); j++) { - - final Node propChild = propChildren.item(j); - if (propChild instanceof Element) { - if ("key".equals(propChild.getNodeName())) { - if (propChild.getChildNodes() != null) { - if (propChild.getChildNodes().getLength() == 1) { - if (propChild.getChildNodes().item(0) instanceof Text) { - final Text t = (Text) propChild.getChildNodes().item(0); - key = t.getTextContent(); - } - } - } - } - - if ("value".equals(propChild.getNodeName())) { - if (propChild.getChildNodes() != null) { - if (propChild.getChildNodes().getLength() == 1) { - if (propChild.getChildNodes().item(0) instanceof Text) { - final Text t = (Text) propChild.getChildNodes().item(0); - value = t.getTextContent(); - } - } - } - } - } - } - - if (key != null && value != null) { - // Put key, value pair into the map - LOG.debug("Loading configuration property: {}, {}", key, value); - this.config.setString(key, value); - } else { - LOG.warn("Error while reading configuration: Cannot read property " + propNumber); - } + LOG.debug("Loading configuration property: {}, {}", key, value); + config.setString(key, value); } } - - } - catch (Exception e) { - LOG.error("Cannot load configuration.", e); + } catch (IOException e) { + throw new RuntimeException("Error parsing YAML configuration.", e); } - } - /** - * Gets a {@link Configuration} object with the values of this GlobalConfiguration - * - * @return the {@link Configuration} object including the key/value pairs - */ - public static Configuration getConfiguration() { - Configuration copy = new Configuration(); - copy.addAll(get().config); - return copy; - } - - /** - * Merges the given {@link Configuration} object into the global - * configuration. If a key/value pair with an identical already - * exists in the global configuration, it is overwritten by the - * pair of the {@link Configuration} object. - * - * @param conf - * the {@link Configuration} object to merge into the global configuration - */ - public static void includeConfiguration(Configuration conf) { - get().includeConfigurationInternal(conf); - } - - /** - * Internal non-static method to include configuration. - * - * @param conf - * the {@link Configuration} object to merge into the global configuration - */ - private void includeConfigurationInternal(Configuration conf) { - // static synchronized - synchronized (getClass()) { - this.config.addAll(conf); - } - } - - // -------------------------------------------------------------------------------------------- - - /** - * Filters files in directory which have the specified suffix (e.g. ".xml"). - * - * @param dirToFilter - * directory to filter - * @param suffix - * suffix to filter files by (e.g. ".xml") - * @return files with given ending in directory - */ - private static File[] filterFilesBySuffix(final File dirToFilter, final String suffix) { - return filterFilesBySuffix(dirToFilter, new String[] { suffix }); + return config; } - private static File[] filterFilesBySuffix(final File dirToFilter, final String[] suffixes) { - return dirToFilter.listFiles(new FilenameFilter() { - @Override - public boolean accept(final File dir, final String name) { - for (String suffix : suffixes) { - if (dir.equals(dirToFilter) && name != null && name.endsWith(suffix)) { - return true; - } - } - - return false; - } - }); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java index 90b366c..a7374e3 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java @@ -41,7 +41,8 @@ public class BinaryInputFormatTest { return record; } } - + + @Test public void testCreateInputSplitsWithOneFile() throws IOException { // create temporary file with 3 blocks http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java index fac979e..be73798 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java @@ -30,6 +30,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; + public class DelimitedInputFormatSamplingTest { private static final String TEST_DATA1 = @@ -66,6 +67,8 @@ public class DelimitedInputFormatSamplingTest { private static final int DEFAULT_NUM_SAMPLES = 4; + private static Configuration CONFIG; + // ======================================================================== // Setup // ======================================================================== @@ -80,16 +83,17 @@ public class DelimitedInputFormatSamplingTest { try { // make sure we do 4 samples - TestConfigUtils.loadGlobalConf( + CONFIG = TestConfigUtils.loadGlobalConf( new String[] { ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY, ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY }, new String[] { "4", "4" }); - - TestDelimitedInputFormat.prepare(); + + } catch (Throwable t) { Assert.fail("Could not load the global configuration."); } } + // ======================================================================== // Tests @@ -101,7 +105,7 @@ public class DelimitedInputFormatSamplingTest { final String tempFile = TestFileUtils.createTempFile(TEST_DATA1); final Configuration conf = new Configuration(); - final TestDelimitedInputFormat format = new TestDelimitedInputFormat(); + final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG); format.setFilePath(tempFile.replace("file", "test")); format.configure(conf); @@ -109,7 +113,7 @@ public class DelimitedInputFormatSamplingTest { format.getStatistics(null); Assert.assertEquals("Wrong number of samples taken.", DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened()); - TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(); + TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(CONFIG); format2.setFilePath(tempFile.replace("file", "test")); format2.setNumLineSamples(8); format2.configure(conf); @@ -130,7 +134,7 @@ public class DelimitedInputFormatSamplingTest { final String tempFile = TestFileUtils.createTempFileDir(TEST_DATA1, TEST_DATA1, TEST_DATA1, TEST_DATA1); final Configuration conf = new Configuration(); - final TestDelimitedInputFormat format = new TestDelimitedInputFormat(); + final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG); format.setFilePath(tempFile.replace("file", "test")); format.configure(conf); @@ -138,7 +142,7 @@ public class DelimitedInputFormatSamplingTest { format.getStatistics(null); Assert.assertEquals("Wrong number of samples taken.", DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened()); - TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(); + TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(CONFIG); format2.setFilePath(tempFile.replace("file", "test")); format2.setNumLineSamples(8); format2.configure(conf); @@ -159,7 +163,7 @@ public class DelimitedInputFormatSamplingTest { final String tempFile = TestFileUtils.createTempFile(TEST_DATA1); final Configuration conf = new Configuration(); - final TestDelimitedInputFormat format = new TestDelimitedInputFormat(); + final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG); format.setFilePath(tempFile); format.configure(conf); BaseStatistics stats = format.getStatistics(null); @@ -180,7 +184,7 @@ public class DelimitedInputFormatSamplingTest { final String tempFile = TestFileUtils.createTempFileDir(TEST_DATA1, TEST_DATA2); final Configuration conf = new Configuration(); - final TestDelimitedInputFormat format = new TestDelimitedInputFormat(); + final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG); format.setFilePath(tempFile); format.configure(conf); BaseStatistics stats = format.getStatistics(null); @@ -212,7 +216,7 @@ public class DelimitedInputFormatSamplingTest { final String tempFile = TestFileUtils.createTempFile(testData); final Configuration conf = new Configuration(); - final TestDelimitedInputFormat format = new TestDelimitedInputFormat(); + final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG); format.setFilePath(tempFile); format.setDelimiter(DELIMITER); format.configure(conf); @@ -235,7 +239,7 @@ public class DelimitedInputFormatSamplingTest { final String tempFile = TestFileUtils.createTempFile(2 * ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN); final Configuration conf = new Configuration(); - final TestDelimitedInputFormat format = new TestDelimitedInputFormat(); + final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG); format.setFilePath(tempFile); format.configure(conf); @@ -252,7 +256,7 @@ public class DelimitedInputFormatSamplingTest { final String tempFile = TestFileUtils.createTempFile(TEST_DATA1); final Configuration conf = new Configuration(); - final TestDelimitedInputFormat format = new TestDelimitedInputFormat(); + final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG); format.setFilePath("test://" + tempFile); format.configure(conf); @@ -260,7 +264,7 @@ public class DelimitedInputFormatSamplingTest { BaseStatistics stats = format.getStatistics(null); Assert.assertEquals("Wrong number of samples taken.", DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened()); - final TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(); + final TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(CONFIG); format2.setFilePath("test://" + tempFile); format2.configure(conf); @@ -274,21 +278,21 @@ public class DelimitedInputFormatSamplingTest { Assert.fail(e.getMessage()); } } - + // ======================================================================== // Mocks // ======================================================================== - + private static final class TestDelimitedInputFormat extends DelimitedInputFormat<IntValue> { private static final long serialVersionUID = 1L; - + + TestDelimitedInputFormat(Configuration configuration) { + super(null, configuration); + } + @Override public IntValue readRecord(IntValue reuse, byte[] bytes, int offset, int numBytes) { throw new UnsupportedOperationException(); } - - public static void prepare() { - DelimitedInputFormat.loadGlobalConfigParams(); - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java index 599a640..8a31099 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.List; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; @@ -47,17 +48,18 @@ import org.junit.Test; public class DelimitedInputFormatTest { - private final DelimitedInputFormat<String> format = new MyTextInputFormat(); + private DelimitedInputFormat<String> format; // -------------------------------------------------------------------------------------------- @Before public void setup() { + format = new MyTextInputFormat(); this.format.setFilePath(new Path("file:///some/file/that/will/not/be/read")); } @After - public void setdown() throws Exception { + public void shutdown() throws Exception { if (this.format != null) { this.format.close(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java index 68465a3..1076338 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.testutils.TestFileUtils; @@ -37,11 +38,12 @@ public class EnumerateNestedFilesTest { protected Configuration config; final String tempPath = System.getProperty("java.io.tmpdir"); - private final DummyFileInputFormat format = new DummyFileInputFormat(); + private DummyFileInputFormat format; @Before public void setup() { this.config = new Configuration(); + format = new DummyFileInputFormat(); } @After http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java index cc040b6..4a598f2 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java @@ -22,13 +22,12 @@ package org.apache.flink.api.common.io; import java.io.File; import java.io.IOException; -import org.junit.Assert; - import org.apache.flink.api.common.io.FileOutputFormat.OutputDirectoryMode; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.types.IntValue; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.fail; @@ -38,11 +37,8 @@ public class FileOutputFormatTest { @Test public void testCreateNonParallelLocalFS() throws IOException { - File tmpOutPath = null; - File tmpOutFile = null; - - tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1"); - tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1"); + File tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1"); + File tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1"); String tmpFilePath = tmpOutPath.toURI().toString(); @@ -652,8 +648,10 @@ public class FileOutputFormatTest { // ------------------------------------------------------------------------------------------- public static class DummyFileOutputFormat extends FileOutputFormat<IntValue> { + private static final long serialVersionUID = 1L; public boolean testFileName = false; + @Override public void writeRecord(IntValue record) throws IOException { // DO NOTHING http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java index ae0f8e5..c3cbb58 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java @@ -52,4 +52,5 @@ public class RichInputFormatTest { assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java index 296af11..4c303a6 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java @@ -52,4 +52,5 @@ public class RichOutputFormatTest { assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java index 3e1a723..0cf5e32 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java @@ -21,7 +21,6 @@ package org.apache.flink.configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; -import org.junit.Before; import org.junit.Test; import java.io.File; @@ -38,15 +37,6 @@ import static org.junit.Assert.fail; public class FilesystemSchemeConfigTest { - @Before - public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException, - IllegalAccessException { - // reset GlobalConfiguration between tests - Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON"); - instance.setAccessible(true); - instance.set(null, null); - } - @Test public void testExplicitFilesystemScheme() { testSettingFilesystemScheme(false, "fs.default-scheme: otherFS://localhost:1234/", true); @@ -65,7 +55,12 @@ public class FilesystemSchemeConfigTest { private void testSettingFilesystemScheme(boolean useDefaultScheme, String configFileScheme, boolean useExplicitScheme) { final File tmpDir = getTmpDir(); - final File confFile = createRandomFile(tmpDir, ".yaml"); + final File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME); + try { + confFile.createNewFile(); + } catch (IOException e) { + throw new RuntimeException("Couldn't create file", e); + } final File testFile = new File(tmpDir.getAbsolutePath() + File.separator + "testing.txt"); try { @@ -83,8 +78,7 @@ public class FilesystemSchemeConfigTest { fail(e.getMessage()); } - GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath()); - Configuration conf = GlobalConfiguration.getConfiguration(); + Configuration conf = GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath()); try { FileSystem.setDefaultScheme(conf); http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java index ce55d2e..6336a73 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java @@ -19,114 +19,58 @@ package org.apache.flink.configuration; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertNotNull; import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.io.PrintWriter; -import java.lang.reflect.Field; import java.util.UUID; -import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.TestLogger; -import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; /** * This class contains tests for the global configuration (parsing configuration directory information). */ public class GlobalConfigurationTest extends TestLogger { - @Before - public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException, - IllegalAccessException { - // reset GlobalConfiguration between tests - Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON"); - instance.setAccessible(true); - instance.set(null, null); - } - - @Test - public void testConfigurationMixed() { - File tmpDir = getTmpDir(); - File confFile1 = createRandomFile(tmpDir, ".yaml"); - File confFile2 = createRandomFile(tmpDir, ".xml"); + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); - try { - try { - PrintWriter pw1 = new PrintWriter(confFile1); - PrintWriter pw2 = new PrintWriter(confFile2); - - pw1.println("mykey1: myvalue1_YAML"); - pw1.println("mykey2: myvalue2"); - - pw2.println("<configuration>"); - pw2.println("<property><key>mykey1</key><value>myvalue1_XML</value></property>"); - pw2.println("<property><key>mykey3</key><value>myvalue3</value></property>"); - pw2.println("</configuration>"); - - pw1.close(); - pw2.close(); - } catch (FileNotFoundException e) { - e.printStackTrace(); - } - - GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath()); - Configuration conf = GlobalConfiguration.getConfiguration(); - - // all distinct keys from confFile1 + confFile2key - assertEquals(3, conf.keySet().size()); - - // keys 1, 2, 3 should be OK and match the expected values - // => configuration keys from YAML should overwrite keys from XML - assertEquals("myvalue1_YAML", conf.getString("mykey1", null)); - assertEquals("myvalue2", conf.getString("mykey2", null)); - assertEquals("myvalue3", conf.getString("mykey3", null)); - } finally { - confFile1.delete(); - confFile2.delete(); - tmpDir.delete(); - } - } - @Test public void testConfigurationYAML() { - File tmpDir = getTmpDir(); - File confFile1 = createRandomFile(tmpDir, ".yaml"); - File confFile2 = createRandomFile(tmpDir, ".yml"); + File tmpDir = tempFolder.getRoot(); + File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME); try { - try { - PrintWriter pw1 = new PrintWriter(confFile1); - PrintWriter pw2 = new PrintWriter(confFile2); - - pw1.println("###########################"); // should be skipped - pw1.println("# Some : comments : to skip"); // should be skipped - pw1.println("###########################"); // should be skipped - pw1.println("mykey1: myvalue1"); // OK, simple correct case - pw1.println("mykey2 : myvalue2"); // OK, whitespace before colon is correct - pw1.println("mykey3:myvalue3"); // SKIP, missing white space after colon - pw1.println(" some nonsense without colon and whitespace separator"); // SKIP - pw1.println(" : "); // SKIP - pw1.println(" "); // SKIP - pw1.println("mykey4: myvalue4# some comments"); // OK, skip comments only - pw1.println(" mykey5 : myvalue5 "); // OK, trim unnecessary whitespace - pw1.println("mykey6: my: value6"); // OK, only use first ': ' as separator - pw1.println("mykey7: "); // SKIP, no value provided - pw1.println(": myvalue8"); // SKIP, no key provided - - pw2.println("mykey9: myvalue9"); // OK - pw2.println("mykey9: myvalue10"); // OK, overwrite last value - - pw1.close(); - pw2.close(); + try (final PrintWriter pw = new PrintWriter(confFile)) { + + pw.println("###########################"); // should be skipped + pw.println("# Some : comments : to skip"); // should be skipped + pw.println("###########################"); // should be skipped + pw.println("mykey1: myvalue1"); // OK, simple correct case + pw.println("mykey2 : myvalue2"); // OK, whitespace before colon is correct + pw.println("mykey3:myvalue3"); // SKIP, missing white space after colon + pw.println(" some nonsense without colon and whitespace separator"); // SKIP + pw.println(" : "); // SKIP + pw.println(" "); // SKIP + pw.println("mykey4: myvalue4# some comments"); // OK, skip comments only + pw.println(" mykey5 : myvalue5 "); // OK, trim unnecessary whitespace + pw.println("mykey6: my: value6"); // OK, only use first ': ' as separator + pw.println("mykey7: "); // SKIP, no value provided + pw.println(": myvalue8"); // SKIP, no key provided + + pw.println("mykey9: myvalue9"); // OK + pw.println("mykey9: myvalue10"); // OK, overwrite last value + } catch (FileNotFoundException e) { e.printStackTrace(); } - GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath()); - Configuration conf = GlobalConfiguration.getConfiguration(); + Configuration conf = GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath()); // all distinct keys from confFile1 + confFile2 key assertEquals(6, conf.keySet().size()); @@ -142,83 +86,36 @@ public class GlobalConfigurationTest extends TestLogger { assertEquals("null", conf.getString("mykey8", "null")); assertEquals("myvalue10", conf.getString("mykey9", null)); } finally { - confFile1.delete(); - confFile2.delete(); + confFile.delete(); tmpDir.delete(); } } - /** - * This test creates several configuration files with values and cross-checks the resulting - * {@link GlobalConfiguration} object. - */ - @Test - public void testConfigurationXML() { - - // Create temporary directory for configuration files - final File tmpDir = getTmpDir(); - final File confFile1 = createRandomFile(tmpDir, ".xml"); - final File confFile2 = createRandomFile(tmpDir, ".xml"); - - try { - try { - final PrintWriter pw1 = new PrintWriter(confFile1); - final PrintWriter pw2 = new PrintWriter(confFile2); - - pw1.append("<configuration>"); - pw2.append("<configuration>"); - - pw1.append("<property><key>mykey1</key><value>myvalue1</value></property>"); - pw1.append("<property></property>"); - pw1.append("<property><key></key><value></value></property>"); - pw1.append("<property><key>hello</key><value></value></property>"); - pw1.append("<property><key>mykey2</key><value>myvalue2</value></property>"); - pw2.append("<property><key>mykey3</key><value>myvalue3</value></property>"); - pw2.append("<property><key>mykey4</key><value>myvalue4</value></property>"); - - pw1.append("</configuration>"); - pw2.append("</configuration>"); - pw1.close(); - pw2.close(); - } catch (FileNotFoundException e) { - fail(e.getMessage()); - } - - GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath()); + @Test(expected = IllegalArgumentException.class) + public void testFailIfNull() { + GlobalConfiguration.loadConfiguration(null); + } - final Configuration co = GlobalConfiguration.getConfiguration(); + @Test(expected = IllegalConfigurationException.class) + public void testFailIfNotLoaded() { + GlobalConfiguration.loadConfiguration("/some/path/" + UUID.randomUUID()); + } - assertEquals(co.getString("mykey1", "null"), "myvalue1"); - assertEquals(co.getString("mykey2", "null"), "myvalue2"); - assertEquals(co.getString("mykey3", "null"), "myvalue3"); - assertEquals(co.getString("mykey4", "null"), "myvalue4"); + @Test(expected = IllegalConfigurationException.class) + public void testInvalidConfiguration() throws IOException { + GlobalConfiguration.loadConfiguration(tempFolder.getRoot().getAbsolutePath()); + } - // // Test (wrong) string-to integer conversion. should return default value. - // semantics are changed to throw an exception upon invalid parsing! - // assertEquals(co.getInteger("mykey1", 500), 500); - // assertEquals(co.getInteger("anything", 500), 500); - // assertEquals(co.getBoolean("notexistent", true), true); + @Test + // We allow malformed YAML files + public void testInvalidYamlFile() throws IOException { + final File confFile = tempFolder.newFile(GlobalConfiguration.FLINK_CONF_FILENAME); - // Test include local configuration - final Configuration newconf = new Configuration(); - newconf.setInteger("mynewinteger", 1000); - GlobalConfiguration.includeConfiguration(newconf); - assertEquals(GlobalConfiguration.getInteger("mynewinteger", 0), 1000); - } finally { - // Remove temporary files - confFile1.delete(); - confFile2.delete(); - tmpDir.delete(); + try (PrintWriter pw = new PrintWriter(confFile);) { + pw.append("invalid"); } - } - private File getTmpDir() { - File tmpDir = new File(CommonTestUtils.getTempDir(), UUID.randomUUID().toString()); - assertTrue(tmpDir.mkdirs()); - return tmpDir; + assertNotNull(GlobalConfiguration.loadConfiguration(tempFolder.getRoot().getAbsolutePath())); } - private File createRandomFile(File path, String suffix) { - return new File(path, UUID.randomUUID().toString() + suffix); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java index 6096f69..d34f20a 100644 --- a/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java +++ b/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; /** @@ -30,20 +31,20 @@ import org.apache.flink.configuration.GlobalConfiguration; */ public final class TestConfigUtils { - public static void loadGlobalConf(String[] keys, String[] values) throws IOException { - loadGlobalConf(getConfAsString(keys, values)); + public static Configuration loadGlobalConf(String[] keys, String[] values) throws IOException { + return loadGlobalConf(getConfAsString(keys, values)); } - public static void loadGlobalConf(String contents) throws IOException { + public static Configuration loadGlobalConf(String contents) throws IOException { final File tempDir = new File(System.getProperty("java.io.tmpdir")); - File confDir = null; + File confDir; do { confDir = new File(tempDir, TestFileUtils.randomFileName()); } while (confDir.exists()); try { confDir.mkdirs(); - final File confFile = new File(confDir, "tempConfig.xml"); + final File confFile = new File(confDir, GlobalConfiguration.FLINK_CONF_FILENAME); try { BufferedWriter writer = new BufferedWriter(new FileWriter(confFile)); @@ -52,7 +53,7 @@ public final class TestConfigUtils { } finally { writer.close(); } - GlobalConfiguration.loadConfiguration(confDir.getAbsolutePath()); + return GlobalConfiguration.loadConfiguration(confDir.getAbsolutePath()); } finally { confFile.delete(); } @@ -61,25 +62,25 @@ public final class TestConfigUtils { confDir.delete(); } } - + public static String getConfAsString(String[] keys, String[] values) { if (keys == null || values == null || keys.length != values.length) { throw new IllegalArgumentException(); } - + StringBuilder bld = new StringBuilder(); - bld.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<configuration>\n"); - + for (int i = 0; i < keys.length; i++) { - bld.append("<property>\n<key>").append(keys[i]).append("</key>\n"); - bld.append("<value>").append(values[i]).append("</value>\n</property>\n"); + bld.append(keys[i]); + bld.append(": "); + bld.append(values[i]); + bld.append(System.lineSeparator()); } - bld.append("</configuration>\n"); return bld.toString(); } // ------------------------------------------------------------------------ - + private TestConfigUtils() {} } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java index ab4e993..7c41eaf 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java @@ -104,13 +104,17 @@ public final class HadoopUtils { * This method is public because its being used in the HadoopDataSource. */ public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() { + + org.apache.flink.configuration.Configuration flinkConfiguration = + GlobalConfiguration.loadConfiguration(); + Configuration retConf = new org.apache.hadoop.conf.Configuration(); // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and // the hdfs configuration // Try to load HDFS configuration from Hadoop's own configuration files // 1. approach: Flink configuration - final String hdfsDefaultPath = GlobalConfiguration.getString(ConfigConstants + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants .HDFS_DEFAULT_CONFIG, null); if (hdfsDefaultPath != null) { retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); @@ -118,7 +122,7 @@ public final class HadoopUtils { LOG.debug("Cannot find hdfs-default configuration file"); } - final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); if (hdfsSitePath != null) { retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); } else { @@ -127,7 +131,7 @@ public final class HadoopUtils { // 2. Approach environment variables String[] possibleHadoopConfPaths = new String[4]; - possibleHadoopConfPaths[0] = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); if (System.getenv("HADOOP_HOME") != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java index b219de4..52fd734 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java @@ -37,12 +37,14 @@ public final class HadoopUtils { /** * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration. */ - public static void mergeHadoopConf(Configuration configuration) { - Configuration hadoopConf = org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration(); - + public static void mergeHadoopConf(Configuration hadoopConfig) { + + Configuration hadoopConf = + org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration(); + for (Map.Entry<String, String> e : hadoopConf) { - if (configuration.get(e.getKey()) == null) { - configuration.set(e.getKey(), e.getValue()); + if (hadoopConfig.get(e.getKey()) == null) { + hadoopConfig.set(e.getKey(), e.getValue()); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java index 75b82cd..05ed6fa 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java @@ -46,12 +46,12 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> { public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) { - super(filePath); + super(filePath, null); this.primitiveClass = primitiveClass; } public PrimitiveInputFormat(Path filePath, String delimiter, Class<OT> primitiveClass) { - super(filePath); + super(filePath, null); this.primitiveClass = primitiveClass; this.setDelimiter(delimiter); } http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java index d6a02f1..b2554bf 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java @@ -50,7 +50,7 @@ public class TextInputFormat extends DelimitedInputFormat<String> { // -------------------------------------------------------------------------------------------- public TextInputFormat(Path filePath) { - super(filePath); + super(filePath, null); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java index a0d20d6..45a2e3e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java @@ -50,7 +50,7 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> { // -------------------------------------------------------------------------------------------- public TextValueInputFormat(Path filePath) { - super(filePath); + super(filePath, null); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index a6cbfa8..d55b9d4 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -73,8 +73,10 @@ public class PythonPlanBinder { public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3"; public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT"; public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_"; - public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python"); - public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3"); + public static String FLINK_PYTHON2_BINARY_PATH = + GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON2_BINARY_KEY, "python"); + public static String FLINK_PYTHON3_BINARY_PATH = + GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON3_BINARY_KEY, "python3"); private static final Random r = new Random(); @@ -113,8 +115,9 @@ public class PythonPlanBinder { } public PythonPlanBinder() throws IOException { - FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python"); - FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3"); + Configuration conf = GlobalConfiguration.loadConfiguration(); + FLINK_PYTHON2_BINARY_PATH = conf.getString(FLINK_PYTHON2_BINARY_KEY, "python"); + FLINK_PYTHON3_BINARY_PATH = conf.getString(FLINK_PYTHON3_BINARY_KEY, "python3"); FULL_PATH = FLINK_DIR != null //command-line ? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index 12c5dfc..5ab1fbf 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.operators.util.UserCodeWrapper; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.dag.TempMode; import org.apache.flink.optimizer.plan.BulkIterationPlanNode; @@ -49,7 +50,6 @@ import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; import org.apache.flink.optimizer.plan.WorksetPlanNode; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.optimizer.util.Utils; import org.apache.flink.runtime.io.network.DataExchangeMode; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -107,10 +107,11 @@ public class JobGraphGenerator implements Visitor<PlanNode> { public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux"; - private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false); - + private static final boolean mergeIterationAuxTasks = + GlobalConfiguration.loadConfiguration().getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false); + private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null, null); - + // ------------------------------------------------------------------------ private Map<PlanNode, JobVertex> vertices; // a map from optimizer nodes to job vertices @@ -156,7 +157,6 @@ public class JobGraphGenerator implements Visitor<PlanNode> { this.useLargeRecordHandler = config.getBoolean( ConfigConstants.USE_LARGE_RECORD_HANDLER_KEY, ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER); - } /** http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index 4e05ebe..5d7173b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -176,20 +176,24 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst * This method is public because its being used in the HadoopDataSource. */ public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() { + + org.apache.flink.configuration.Configuration flinkConfiguration = + GlobalConfiguration.loadConfiguration(); + Configuration retConf = new org.apache.hadoop.conf.Configuration(); // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and // the hdfs configuration // Try to load HDFS configuration from Hadoop's own configuration files // 1. approach: Flink configuration - final String hdfsDefaultPath = GlobalConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null); + final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null); if (hdfsDefaultPath != null) { retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); } else { LOG.debug("Cannot find hdfs-default configuration file"); } - final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); if (hdfsSitePath != null) { retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); } else { @@ -198,7 +202,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst // 2. Approach environment variables String[] possibleHadoopConfPaths = new String[4]; - possibleHadoopConfPaths[0] = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); if (System.getenv("HADOOP_HOME") != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index f14a37f..84d38c1 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2350,8 +2350,7 @@ object JobManager { } LOG.info("Loading configuration from " + configDir) - GlobalConfiguration.loadConfiguration(configDir) - val configuration = GlobalConfiguration.getConfiguration() + val configuration = GlobalConfiguration.loadConfiguration(configDir) try { FileSystem.setDefaultScheme(configuration) http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index a7dd789..226fa75 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1526,7 +1526,6 @@ object TaskManager { val conf: Configuration = try { LOG.info("Loading configuration from " + cliConfig.getConfigDir()) GlobalConfiguration.loadConfiguration(cliConfig.getConfigDir()) - GlobalConfiguration.getConfiguration() } catch { case e: Exception => throw new Exception("Could not load configuration", e)
