Repository: flink
Updated Branches:
  refs/heads/master 010f44c71 -> ae3d547af


[FLINK-8475][config][docs] Integrate FS options

This closes #5459.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29dcec5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29dcec5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29dcec5c

Branch: refs/heads/master
Commit: 29dcec5cc44f4dd9b0124d9c0a15d31a92ab30f6
Parents: 010f44c
Author: zentol <ches...@apache.org>
Authored: Mon Jan 22 16:16:02 2018 +0100
Committer: zentol <ches...@apache.org>
Committed: Wed Feb 14 11:56:28 2018 +0100

----------------------------------------------------------------------
 .../generated/file_system_configuration.html    | 26 ++++++++++++++++++
 docs/ops/config.md                              | 11 +-------
 .../org/apache/flink/client/LocalExecutor.java  |  3 +-
 .../flink/api/common/io/FileOutputFormat.java   | 10 ++-----
 .../flink/configuration/ConfigConstants.java    |  9 ++++++
 .../apache/flink/configuration/CoreOptions.java | 29 +++++++++++++++++++-
 .../apache/flink/test/util/TestBaseUtils.java   |  3 +-
 .../test/cancelling/CancelingTestBase.java      |  3 +-
 8 files changed, 73 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/29dcec5c/docs/_includes/generated/file_system_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/file_system_configuration.html 
b/docs/_includes/generated/file_system_configuration.html
new file mode 100644
index 0000000..f3738f9
--- /dev/null
+++ b/docs/_includes/generated/file_system_configuration.html
@@ -0,0 +1,26 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>fs.default-scheme</h5></td>
+            <td>(none)</td>
+            <td>The default filesystem scheme, used for paths that do not 
declare a scheme explicitly. May contain an authority, e.g. host:port in case 
of a HDFS NameNode.</td>
+        </tr>
+        <tr>
+            <td><h5>fs.output.always-create-directory</h5></td>
+            <td>false</td>
+            <td>File writers running with a parallelism larger than one create 
a directory for the output file path and put the different result files (one 
per parallel writer task) into that directory. If this option is set to "true", 
writers with a parallelism of 1 will also create a directory and place a single 
result file into it. If the option is set to "false", the writer will directly 
create the file directly at the output path, without creating a containing 
directory.</td>
+        </tr>
+        <tr>
+            <td><h5>fs.overwrite-files</h5></td>
+            <td>false</td>
+            <td>Specifies whether file output writers should overwrite 
existing files by default. Set to "true" to overwrite by default,"false" 
otherwise.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/29dcec5c/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index adead19..22f9213 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -350,16 +350,7 @@ These parameters allow for advanced tuning. The default 
values are sufficient wh
 
 ### File Systems
 
-The parameters define the behavior of tasks that create result files.
-
-- `fs.default-scheme`: The default filesystem scheme to be used, with the 
necessary authority to contact, e.g. the host:port of the NameNode in the case 
of HDFS (if needed).
-By default, this is set to `file:///` which points to the local filesystem. 
This means that the local
-filesystem is going to be used to search for user-specified files **without** 
an explicit scheme
-definition. This scheme is used **ONLY** if no other scheme is specified 
(explicitly) in the user-provided `URI`.
-
-- `fs.overwrite-files`: Specifies whether file output writers should overwrite 
existing files by default. Set to *true* to overwrite by default, *false* 
otherwise. (DEFAULT: false)
-
-- `fs.output.always-create-directory`: File writers running with a parallelism 
larger than one create a directory for the output file path and put the 
different result files (one per parallel writer task) into that directory. If 
this option is set to *true*, writers with a parallelism of 1 will also create 
a directory and place a single result file into it. If the option is set to 
*false*, the writer will directly create the file directly at the output path, 
without creating a containing directory. (DEFAULT: false)
+{% include generated/file_system_configuration.html %}
 
 ### Compiler/Optimizer
 

http://git-wip-us.apache.org/repos/asf/flink/blob/29dcec5c/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index bb74bdb..dd7e3ef 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -232,7 +233,7 @@ public class LocalExecutor extends PlanExecutor {
        private Configuration createConfiguration() {
                Configuration configuration = new Configuration();
                
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
getTaskManagerNumSlots());
-               
configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, 
isDefaultOverwriteFiles());
+               
configuration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, 
isDefaultOverwriteFiles());
                return configuration;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/29dcec5c/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 1382f06..585d7cc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -22,10 +22,10 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -73,15 +73,11 @@ public abstract class FileOutputFormat<IT> extends 
RichOutputFormat<IT> implemen
         * @param configuration The configuration to load defaults from
         */
        private static void initDefaultsFromConfiguration(Configuration 
configuration) {
-               final boolean overwrite = configuration.getBoolean(
-                               
ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
-                               ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE);
+               final boolean overwrite = 
configuration.getBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE);
        
                DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : 
WriteMode.NO_OVERWRITE;
                
-               final boolean alwaysCreateDirectory = configuration.getBoolean(
-                       
ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
-                       
ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY);
+               final boolean alwaysCreateDirectory = 
configuration.getBoolean(CoreOptions.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY);
        
                DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? 
OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/29dcec5c/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 105ee22..b916048 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -643,12 +643,18 @@ public final class ConfigConstants {
 
        /**
         * Key to specify whether the file systems should simply overwrite 
existing files.
+        *
+        * @deprecated Use {@link CoreOptions#FILESYTEM_DEFAULT_OVERRIDE} 
instead.
         */
+       @Deprecated
        public static final String FILESYSTEM_DEFAULT_OVERWRITE_KEY = 
"fs.overwrite-files";
 
        /**
         * Key to specify whether the file systems should always create a 
directory for the output, even with a parallelism of one.
+        *
+        * @deprecated Use {@link 
CoreOptions#FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY} instead.
         */
+       @Deprecated
        public static final String 
FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY = 
"fs.output.always-create-directory";
 
        // ---------------------------- Compiler -------------------------------
@@ -1555,7 +1561,10 @@ public final class ConfigConstants {
 
        /**
         * The default behavior for output directory creating (create only 
directory when parallelism &gt; 1).
+        *
+        * @deprecated Use {@link 
CoreOptions#FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY} instead.
         */
+       @Deprecated
        public static final boolean DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY 
= false;
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/29dcec5c/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index bdddd75..7f5e8da 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -26,6 +26,9 @@ import static 
org.apache.flink.configuration.ConfigOptions.key;
  * The set of configuration options for core parameters.
  */
 @PublicEvolving
+@ConfigGroups(groups = {
+       @ConfigGroup(name = "FileSystem", keyPrefix = "fs")
+})
 public class CoreOptions {
 
        // 
------------------------------------------------------------------------
@@ -127,7 +130,31 @@ public class CoreOptions {
         */
        public static final ConfigOption<String> DEFAULT_FILESYSTEM_SCHEME = 
ConfigOptions
                        .key("fs.default-scheme")
-                       .noDefaultValue();
+                       .noDefaultValue()
+                       .withDescription("The default filesystem scheme, used 
for paths that do not declare a scheme explicitly." +
+                               " May contain an authority, e.g. host:port in 
case of a HDFS NameNode.");
+
+       /**
+        * Specifies whether file output writers should overwrite existing 
files by default.
+        */
+       public static final ConfigOption<Boolean> FILESYTEM_DEFAULT_OVERRIDE =
+               key("fs.overwrite-files")
+                       .defaultValue(false)
+                       .withDescription("Specifies whether file output writers 
should overwrite existing files by default. Set to" +
+                               " \"true\" to overwrite by default,\"false\" 
otherwise.");
+
+       /**
+        * Specifies whether the file systems should always create a directory 
for the output, even with a parallelism of one.
+        */
+       public static final ConfigOption<Boolean> 
FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY =
+               key("fs.output.always-create-directory")
+                       .defaultValue(false)
+                       .withDescription("File writers running with a 
parallelism larger than one create a directory for the output" +
+                               " file path and put the different result files 
(one per parallel writer task) into that directory." +
+                               " If this option is set to \"true\", writers 
with a parallelism of 1 will also create a" +
+                               " directory and place a single result file into 
it. If the option is set to \"false\"," +
+                               " the writer will directly create the file 
directly at the output path, without creating a containing" +
+                               " directory.");
 
        /**
         * The total number of input plus output connections that a file system 
for the given scheme may open.

http://git-wip-us.apache.org/repos/asf/flink/blob/29dcec5c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 5a96326..d7142f5 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
@@ -149,7 +150,7 @@ public class TestBaseUtils extends TestLogger {
                Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
                config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
TASK_MANAGER_MEMORY_SIZE);
-               
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
+               config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
 
                config.setString(AkkaOptions.ASK_TIMEOUT, 
DEFAULT_AKKA_ASK_TIMEOUT + "s");
                config.setString(AkkaOptions.STARTUP_TIMEOUT, 
DEFAULT_AKKA_STARTUP_TIMEOUT);

http://git-wip-us.apache.org/repos/asf/flink/blob/29dcec5c/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 88c921d..03ca649 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -87,7 +88,7 @@ public abstract class CancelingTestBase extends TestLogger {
        public void startCluster() throws Exception {
                verifyJvmOptions();
                Configuration config = new Configuration();
-               
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
+               config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
4);
                config.setString(AkkaOptions.ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());

Reply via email to