Repository: flink
Updated Branches:
  refs/heads/master 20faf262d -> 2a612d9da


[FLINK-6590][docs] Integrate configuration docs generator

This closes #5119.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a612d9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a612d9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a612d9d

Branch: refs/heads/master
Commit: 2a612d9da3317e4e79c10be7970e6bdb55a1f0e3
Parents: 20faf26
Author: zentol <[email protected]>
Authored: Mon May 15 21:06:38 2017 +0200
Committer: zentol <[email protected]>
Committed: Fri Jan 19 23:28:43 2018 +0100

----------------------------------------------------------------------
 .../generated/history_server_configuration.html |  46 +++
 .../generated/kerberos_configuration.html       |  31 ++
 .../generated/metric_configuration.html         |  71 +++++
 .../queryable_state_configuration.html          |  46 +++
 docs/_includes/generated/web_configuration.html |  91 ++++++
 .../generated/zoo_keeper_configuration.html     |  26 ++
 docs/ops/config.md                              |  88 +-----
 docs/page/css/flink.css                         |   2 +-
 flink-core/pom.xml                              |  34 ---
 .../ConfigOptionsDocGenerator.java              | 281 ------------------
 .../configuration/HistoryServerOptions.java     |  22 +-
 .../flink/configuration/MetricOptions.java      |  37 ++-
 .../configuration/QueryableStateOptions.java    |  43 ++-
 .../flink/configuration/SecurityOptions.java    |  18 +-
 .../ConfigDocsCompletenessChecker.java          |  49 ----
 .../ConfigOptionsDocGeneratorTest.java          | 167 -----------
 flink-docs/README.md                            |  15 +-
 flink-docs/pom.xml                              |  69 +++++
 .../ConfigOptionsDocGenerator.java              | 294 +++++++++++++++++++
 .../ConfigDocsCompletenessChecker.java          |  49 ++++
 .../ConfigOptionsDocGeneratorTest.java          | 174 +++++++++++
 flink-libraries/flink-python/pom.xml            |   2 +-
 flink-yarn/pom.xml                              |  32 --
 23 files changed, 1007 insertions(+), 680 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/docs/_includes/generated/history_server_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/history_server_configuration.html 
b/docs/_includes/generated/history_server_configuration.html
new file mode 100644
index 0000000..daabb1d
--- /dev/null
+++ b/docs/_includes/generated/history_server_configuration.html
@@ -0,0 +1,46 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default Value</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>historyserver.archive.fs.dir</h5></td>
+            <td>(none)</td>
+            <td>Comma separated list of directories to fetch archived jobs 
from. The history server will monitor these directories for archived jobs. You 
can configure the JobManager to archive jobs to a directory via 
`jobmanager.archive.fs.dir`.</td>
+        </tr>
+        <tr>
+            <td><h5>historyserver.archive.fs.refresh-interval</h5></td>
+            <td>10000</td>
+            <td>Interval in milliseconds for refreshing the archived job 
directories.</td>
+        </tr>
+        <tr>
+            <td><h5>historyserver.web.address</h5></td>
+            <td>(none)</td>
+            <td>Address of the HistoryServer's web interface.</td>
+        </tr>
+        <tr>
+            <td><h5>historyserver.web.port</h5></td>
+            <td>8082</td>
+            <td>Port of the HistoryServers's web interface.</td>
+        </tr>
+        <tr>
+            <td><h5>historyserver.web.refresh-interval</h5></td>
+            <td>10000</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>historyserver.web.ssl.enabled</h5></td>
+            <td>false</td>
+            <td>Enable HTTPs access to the HistoryServer web frontend. This is 
applicable only when the global SSL flag security.ssl.enabled is set to 
true.</td>
+        </tr>
+        <tr>
+            <td><h5>historyserver.web.tmpdir</h5></td>
+            <td>(none)</td>
+            <td>This configuration parameter allows defining the Flink web 
directory to be used by the history server web interface. The web interface 
will copy its static files into the directory.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/docs/_includes/generated/kerberos_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/kerberos_configuration.html 
b/docs/_includes/generated/kerberos_configuration.html
new file mode 100644
index 0000000..704d011
--- /dev/null
+++ b/docs/_includes/generated/kerberos_configuration.html
@@ -0,0 +1,31 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default Value</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>security.kerberos.login.contexts</h5></td>
+            <td>(none)</td>
+            <td>A comma-separated list of login contexts to provide the 
Kerberos credentials to (for example, `Client,KafkaClient` to use the 
credentials for ZooKeeper authentication and for Kafka authentication)</td>
+        </tr>
+        <tr>
+            <td><h5>security.kerberos.login.keytab</h5></td>
+            <td>(none)</td>
+            <td>Absolute path to a Kerberos keytab file that contains the user 
credentials.</td>
+        </tr>
+        <tr>
+            <td><h5>security.kerberos.login.principal</h5></td>
+            <td>(none)</td>
+            <td>Kerberos principal name associated with the keytab.</td>
+        </tr>
+        <tr>
+            <td><h5>security.kerberos.login.use-ticket-cache</h5></td>
+            <td>true</td>
+            <td>Indicates whether to read from your Kerberos ticket cache.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/docs/_includes/generated/metric_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/metric_configuration.html 
b/docs/_includes/generated/metric_configuration.html
new file mode 100644
index 0000000..83f5abc
--- /dev/null
+++ b/docs/_includes/generated/metric_configuration.html
@@ -0,0 +1,71 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default Value</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>metrics.latency.history-size</h5></td>
+            <td>128</td>
+            <td>Defines the number of measured latencies to maintain at each 
operator.</td>
+        </tr>
+        <tr>
+            
<td><h5>metrics.reporter.&#60;name&#62;.&#60;parameter&#62;</h5></td>
+            <td>(none)</td>
+            <td>Configures the parameter &#60;parameter&#62; for the reporter 
named &#60;name&#62;.</td>
+        </tr>
+        <tr>
+            <td><h5>metrics.reporter.&#60;name&#62;.class</h5></td>
+            <td>(none)</td>
+            <td>The reporter class to use for the reporter named 
&#60;name&#62;.</td>
+        </tr>
+        <tr>
+            <td><h5>metrics.reporter.&#60;name&#62;.interval</h5></td>
+            <td>(none)</td>
+            <td>The reporter interval to use for the reporter named 
&#60;name&#62;.</td>
+        </tr>
+        <tr>
+            <td><h5>metrics.reporters</h5></td>
+            <td>(none)</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>metrics.scope.delimiter</h5></td>
+            <td>"."</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>metrics.scope.jm</h5></td>
+            <td>"&#60;host&#62;.jobmanager"</td>
+            <td>Defines the scope format string that is applied to all metrics 
scoped to a JobManager.</td>
+        </tr>
+        <tr>
+            <td><h5>metrics.scope.jm.job</h5></td>
+            <td>"&#60;host&#62;.jobmanager.&#60;job_name&#62;"</td>
+            <td>Defines the scope format string that is applied to all metrics 
scoped to a job on a JobManager.</td>
+        </tr>
+        <tr>
+            <td><h5>metrics.scope.operator</h5></td>
+            
<td>"&#60;host&#62;.taskmanager.&#60;tm_id&#62;.&#60;job_name&#62;.&#60;operator_name&#62;.&#60;subtask_index&#62;"</td>
+            <td>Defines the scope format string that is applied to all metrics 
scoped to an operator.</td>
+        </tr>
+        <tr>
+            <td><h5>metrics.scope.task</h5></td>
+            
<td>"&#60;host&#62;.taskmanager.&#60;tm_id&#62;.&#60;job_name&#62;.&#60;task_name&#62;.&#60;subtask_index&#62;"</td>
+            <td>Defines the scope format string that is applied to all metrics 
scoped to a task.</td>
+        </tr>
+        <tr>
+            <td><h5>metrics.scope.tm</h5></td>
+            <td>"&#60;host&#62;.taskmanager.&#60;tm_id&#62;"</td>
+            <td>Defines the scope format string that is applied to all metrics 
scoped to a TaskManager.</td>
+        </tr>
+        <tr>
+            <td><h5>metrics.scope.tm.job</h5></td>
+            
<td>"&#60;host&#62;.taskmanager.&#60;tm_id&#62;.&#60;job_name&#62;"</td>
+            <td>Defines the scope format string that is applied to all metrics 
scoped to a job on a TaskManager.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/docs/_includes/generated/queryable_state_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/queryable_state_configuration.html 
b/docs/_includes/generated/queryable_state_configuration.html
new file mode 100644
index 0000000..53c5fbe
--- /dev/null
+++ b/docs/_includes/generated/queryable_state_configuration.html
@@ -0,0 +1,46 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default Value</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>query.client.network-threads</h5></td>
+            <td>0</td>
+            <td>Number of network (Netty's event loop) Threads for queryable 
state client.</td>
+        </tr>
+        <tr>
+            <td><h5>query.proxy.network-threads</h5></td>
+            <td>0</td>
+            <td>Number of network (Netty's event loop) Threads for queryable 
state proxy.</td>
+        </tr>
+        <tr>
+            <td><h5>query.proxy.ports</h5></td>
+            <td>"9069"</td>
+            <td>The port range of the queryable state proxy. The specified 
range can be a single port: "9123", a range of ports: "50100-50200", or a list 
of ranges and ports: "50100-50200,50300-50400,51234".</td>
+        </tr>
+        <tr>
+            <td><h5>query.proxy.query-threads</h5></td>
+            <td>0</td>
+            <td>Number of query Threads for queryable state proxy. Uses the 
number of slots if set to 0.</td>
+        </tr>
+        <tr>
+            <td><h5>query.server.network-threads</h5></td>
+            <td>0</td>
+            <td>Number of network (Netty's event loop) Threads for queryable 
state server.</td>
+        </tr>
+        <tr>
+            <td><h5>query.server.ports</h5></td>
+            <td>"9067"</td>
+            <td>The port range of the queryable state server. The specified 
range can be a single port: "9123", a range of ports: "50100-50200", or a list 
of ranges and ports: "50100-50200,50300-50400,51234".</td>
+        </tr>
+        <tr>
+            <td><h5>query.server.query-threads</h5></td>
+            <td>0</td>
+            <td>Number of query Threads for queryable state server. Uses the 
number of slots if set to 0.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/docs/_includes/generated/web_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/web_configuration.html 
b/docs/_includes/generated/web_configuration.html
new file mode 100644
index 0000000..17bd673
--- /dev/null
+++ b/docs/_includes/generated/web_configuration.html
@@ -0,0 +1,91 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default Value</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>web.access-control-allow-origin</h5></td>
+            <td>"*"</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.address</h5></td>
+            <td>(none)</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.backpressure.cleanup-interval</h5></td>
+            <td>600000</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.backpressure.delay-between-samples</h5></td>
+            <td>50</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.backpressure.num-samples</h5></td>
+            <td>100</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.backpressure.refresh-interval</h5></td>
+            <td>60000</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.checkpoints.history</h5></td>
+            <td>10</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.history</h5></td>
+            <td>5</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.log.path</h5></td>
+            <td>(none)</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.port</h5></td>
+            <td>8081</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.refresh-interval</h5></td>
+            <td>3000</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.ssl.enabled</h5></td>
+            <td>true</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.submit.enable</h5></td>
+            <td>true</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.timeout</h5></td>
+            <td>10000</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.tmpdir</h5></td>
+            <td>(none)</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>web.upload.dir</h5></td>
+            <td>(none)</td>
+            <td></td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/docs/_includes/generated/zoo_keeper_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/zoo_keeper_configuration.html 
b/docs/_includes/generated/zoo_keeper_configuration.html
new file mode 100644
index 0000000..3e5353c
--- /dev/null
+++ b/docs/_includes/generated/zoo_keeper_configuration.html
@@ -0,0 +1,26 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default Value</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>zookeeper.sasl.disable</h5></td>
+            <td>false</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>zookeeper.sasl.login-context-name</h5></td>
+            <td>"Client"</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>zookeeper.sasl.service-name</h5></td>
+            <td>"zookeeper"</td>
+            <td></td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 9bb7ef9..3f8fca9 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -392,27 +392,7 @@ These parameters allow for advanced tuning. The default 
values are sufficient wh
 
 ### Web Frontend
 
-- `web.port`: Port of the web interface that displays status of running jobs 
and execution time breakdowns of finished jobs (DEFAULT: 8081). Setting this 
value to `-1` disables the web frontend.
-
-- `web.history`: The number of latest jobs that the web front-end in its 
history (DEFAULT: 5).
-
-- `web.checkpoints.disable`: Disables checkpoint statistics (DEFAULT: `false`).
-
-- `web.checkpoints.history`: Number of checkpoint statistics to remember 
(DEFAULT: `10`).
-
-- `web.backpressure.cleanup-interval`: Time after which cached stats are 
cleaned up if not accessed (DEFAULT: `600000`, 10 mins).
-
-- `web.backpressure.refresh-interval`: Time after which available stats are 
deprecated and need to be refreshed (DEFAULT: `60000`, 1 min).
-
-- `web.backpressure.num-samples`: Number of stack trace samples to take to 
determine back pressure (DEFAULT: `100`).
-
-- `web.backpressure.delay-between-samples`: Delay between stack trace samples 
to determine back pressure (DEFAULT: `50`, 50 ms).
-
-- `web.ssl.enabled`: Enable https access to the web frontend. This is 
applicable only when the global ssl flag security.ssl.enabled is set to true 
(DEFAULT: `true`).
-
-- `web.access-control-allow-origin`: Enable custom access control parameter 
for allow origin header, default is `*`.
-
-- `web.timeout`: Timeout for asynchronous operation executed by the web 
frontend in milliseconds (DEFAULT: `10000`, 10 s)
+{% include generated/web_configuration.html %}
 
 ### File Systems
 
@@ -564,21 +544,13 @@ Previously this key was named `recovery.mode` and the 
default value was `standal
 
 - `high-availability.zookeeper.client.acl`: (Default `open`) Defines the ACL 
(open\|creator) to be configured on ZK node. The configuration value can be set 
to "creator" if the ZooKeeper server configuration has the "authProvider" 
property mapped to use SASLAuthenticationProvider and the cluster is configured 
to run in secure mode (Kerberos). The ACL options are based on 
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
 
-#### ZooKeeper Security
-
-- `zookeeper.sasl.disable`: (Default: `true`) Defines if SASL based 
authentication needs to be enabled or disabled. The configuration value can be 
set to "true" if ZooKeeper cluster is running in secure mode (Kerberos).
+### ZooKeeper Security
 
-- `zookeeper.sasl.service-name`: (Default: `zookeeper`) If the ZooKeeper 
server is configured with a different service name (default:"zookeeper") then 
it can be supplied using this configuration. A mismatch in service name between 
client and server configuration will cause the authentication to fail.
+{% include generated/zoo_keeper_configuration.html %}
 
 ### Kerberos-based Security
 
-- `security.kerberos.login.use-ticket-cache`: Indicates whether to read from 
your Kerberos ticket cache (default: `true`).
-
-- `security.kerberos.login.keytab`: Absolute path to a Kerberos keytab file 
that contains the user credentials.
-
-- `security.kerberos.login.principal`: Kerberos principal name associated with 
the keytab.
-
-- `security.kerberos.login.contexts`: A comma-separated list of login contexts 
to provide the Kerberos credentials to (for example, `Client,KafkaClient` to 
use the credentials for ZooKeeper authentication and for Kafka authentication).
+{% include generated/kerberos_configuration.html %}
 
 ### Environment
 
@@ -590,47 +562,11 @@ Previously this key was named `recovery.mode` and the 
default value was `standal
 
 ### Queryable State
 
-#### Server
-
-- `query.server.enable`: Enable queryable state (Default: `true`).
-
-- `query.server.port`: Port to bind queryable state server to (Default: `0`, 
binds to random port).
-
-- `query.server.network-threads`: Number of network (Netty's event loop) 
Threads for queryable state server (Default: `0`, picks number of slots).
-
-- `query.server.query-threads`: Number of query Threads for queryable state 
server (Default: `0`, picks number of slots).
-
-#### Client
-
-- `query.client.network-threads`: Number of network (Netty's event loop) 
Threads for queryable state client (Default: `0`, picks number of available 
cores as returned by `Runtime.getRuntime().availableProcessors()`).
-
-- `query.client.lookup.num-retries`: Number of retries on KvState lookup 
failure due to unavailable JobManager (Default: `3`).
-
-- `query.client.lookup.retry-delay`: Retry delay in milliseconds on KvState 
lookup failure due to unavailable JobManager (Default: `1000`).
+{% include generated/queryable_state_configuration.html %}
 
 ### Metrics
 
-- `metrics.reporters`: The list of named reporters, i.e. "foo,bar".
-
-- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the 
reporter named `<name>`.
-
-- `metrics.reporter.<name>.class`: The reporter class to use for the reporter 
named `<name>`.
-
-- `metrics.reporter.<name>.interval`: The reporter interval to use for the 
reporter named `<name>`.
-
-- `metrics.scope.jm`: (Default: &lt;host&gt;.jobmanager) Defines the scope 
format string that is applied to all metrics scoped to a JobManager.
-
-- `metrics.scope.jm.job`: (Default: &lt;host&gt;.jobmanager.&lt;job_name&gt;) 
Defines the scope format string that is applied to all metrics scoped to a job 
on a JobManager.
-
-- `metrics.scope.tm`: (Default: &lt;host&gt;.taskmanager.&lt;tm_id&gt;) 
Defines the scope format string that is applied to all metrics scoped to a 
TaskManager.
-
-- `metrics.scope.tm.job`: (Default: 
&lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;) Defines the scope 
format string that is applied to all metrics scoped to a job on a TaskManager.
-
-- `metrics.scope.task`: (Default: 
&lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;.&lt;task_name&gt;.&lt;subtask_index&gt;)
 Defines the scope format string that is applied to all metrics scoped to a 
task.
-
-- `metrics.scope.operator`: (Default: 
&lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;.&lt;operator_name&gt;.&lt;subtask_index&gt;)
 Defines the scope format string that is applied to all metrics scoped to an 
operator.
-
-- `metrics.latency.history-size`: (Default: 128) Defines the number of 
measured latencies to maintain at each operator
+{% include generated/metric_configuration.html %}
 
 ### History Server
 
@@ -638,17 +574,7 @@ You have to configure `jobmanager.archive.fs.dir` in order 
to archive terminated
 
 - `jobmanager.archive.fs.dir`: Directory to upload information about 
terminated jobs to. You have to add this directory to the list of monitored 
directories of the history server via `historyserver.archive.fs.dir`.
 
-- `historyserver.archive.fs.dir`: Comma separated list of directories to fetch 
archived jobs from. The history server will monitor these directories for 
archived jobs. You can configure the JobManager to archive jobs to a directory 
via `jobmanager.archive.fs.dir`.
-
-- `historyserver.archive.fs.refresh-interval`: Interval in milliseconds for 
refreshing the archived job directories (DEFAULT: `10000`).
-
-- `historyserver.web.tmpdir`: This configuration parameter allows defining the 
Flink web directory to be used by the history server web interface. The web 
interface will copy its static files into the directory (DEFAULT: local system 
temporary directory).
-
-- `historyserver.web.address`: Address of the HistoryServer's web interface 
(DEFAULT: `anyLocalAddress()`).
-
-- `historyserver.web.port`: Port of the HistoryServers's web interface 
(DEFAULT: `8082`).
-
-- `historyserver.web.ssl.enabled`: Enable HTTPs access to the HistoryServer 
web frontend. This is applicable only when the global SSL flag 
security.ssl.enabled is set to true (DEFAULT: `false`).
+{% include generated/history_server_configuration.html %}
 
 ### Slot Manager (Flip-6)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/docs/page/css/flink.css
----------------------------------------------------------------------
diff --git a/docs/page/css/flink.css b/docs/page/css/flink.css
index c0a7917..a17027d 100644
--- a/docs/page/css/flink.css
+++ b/docs/page/css/flink.css
@@ -102,7 +102,7 @@ h2 {
        border-bottom: 1px solid #e5e5e5;
 }
 
-h2, h3, h4, h5, h6, h7 {
+h2, h3, h4 {
        padding-top: 1em;
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 285074e..7ec8097 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -122,40 +122,6 @@ under the License.
 
        </dependencies>
 
-       <profiles>
-               <profile>
-                       <id>generate-config-docs</id>
-
-                       <build>
-                               <plugins>
-                                       <plugin>
-                                               
<artifactId>maven-antrun-plugin</artifactId>
-                                               <version>1.7</version>
-                                               <executions>
-                                                       <execution>
-                                                               
<phase>package</phase>
-                                                               <goals>
-                                                                       
<goal>run</goal>
-                                                               </goals>
-                                                       </execution>
-                                               </executions>
-                                               <configuration>
-                                                       <target>
-                                                               <mkdir 
dir="${rootDir}/${generated.docs.dir}"/>
-                                                               <java 
classname="org.apache.flink.configuration.ConfigOptionsDocGenerator" 
fork="true">
-                                                                       
<classpath refid="maven.compile.classpath" />
-                                                                       <arg 
value="${rootDir}/${generated.docs.dir}/" />
-                                                                       
<!--package with configuration classes-->
-                                                                       <arg 
value="org.apache.flink.configuration" />
-                                                               </java>
-                                                       </target>
-                                               </configuration>
-                                       </plugin>
-                               </plugins>
-                       </build>
-               </profile>
-       </profiles>
-
        <build>
                <plugins>
                        <plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
deleted file mode 100644
index 02ce7bb..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptionsDocGenerator.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.configuration;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Class used for generating code based documentation of configuration 
parameters.
- */
-public class ConfigOptionsDocGenerator {
-
-       /**
-        * This method generates html tables from set of classes containing 
{@link ConfigOption ConfigOptions}.
-        *
-        * <p>For each class 1 or more html tables will be generated and placed 
into a separate file, depending on whether
-        * the class is annotated with {@link ConfigGroups}. The tables contain 
the key, default value and description for
-        * every {@link ConfigOption}.
-        *
-        * @param args first argument is output path for the generated files, 
second argument is full package name containing
-        *             classes with {@link ConfigOption}
-        */
-       public static void main(String[] args) throws IOException, 
ClassNotFoundException {
-               String outputPath = args[0];
-               String packageName = args[1];
-
-               Path configDir = Paths.get("../src/main/java", 
packageName.replaceAll("\\.", "/"));
-
-               Pattern p = Pattern.compile("(([a-zA-Z]*)(Options))\\.java");
-               try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(configDir, "*Options.java")) {
-                       for (Path entry : stream) {
-                               String fileName = 
entry.getFileName().toString();
-                               Matcher matcher = p.matcher(fileName);
-                               if (!fileName.equals("ConfigOptions.java") && 
matcher.matches()) {
-                                       Class<?> optionsClass = 
Class.forName(packageName + "." + matcher.group(1));
-                                       List<Tuple2<ConfigGroup, String>> 
tables = generateTablesForClass(optionsClass);
-                                       if (tables.size() > 0) {
-                                               for (Tuple2<ConfigGroup, 
String> group : tables) {
-
-                                                       String name = group.f0 
== null
-                                                               ? 
matcher.group(2).replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase()
-                                                               : 
group.f0.name().replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase();
-
-                                                       String outputFile = 
name + "_configuration.html";
-                                                       
Files.write(Paths.get(outputPath, outputFile), 
group.f1.getBytes(StandardCharsets.UTF_8));
-                                               }
-                                       }
-                               }
-                       }
-               }
-       }
-
-       @VisibleForTesting
-       static List<Tuple2<ConfigGroup, String>> 
generateTablesForClass(Class<?> optionsClass) {
-               ConfigGroups configGroups = 
optionsClass.getAnnotation(ConfigGroups.class);
-               List<Tuple2<ConfigGroup, String>> tables = new ArrayList<>();
-               List<ConfigOption> allOptions = 
extractConfigOptions(optionsClass);
-
-               if (configGroups != null) {
-                       Tree tree = new Tree(configGroups.groups(), allOptions);
-
-                       for (ConfigGroup group : configGroups.groups()) {
-                               List<ConfigOption> configOptions = 
tree.findConfigOptions(group);
-                               sortOptions(configOptions);
-                               tables.add(Tuple2.of(group, 
toHtmlTable(configOptions)));
-                       }
-                       List<ConfigOption> configOptions = 
tree.getDefaultOptions();
-                       sortOptions(configOptions);
-                       tables.add(Tuple2.<ConfigGroup, String>of(null, 
toHtmlTable(configOptions)));
-               } else {
-                       sortOptions(allOptions);
-                       tables.add(Tuple2.<ConfigGroup, String>of(null, 
toHtmlTable(allOptions)));
-               }
-               return tables;
-       }
-
-       private static List<ConfigOption> extractConfigOptions(Class<?> clazz) {
-               try {
-                       List<ConfigOption> configOptions = new ArrayList<>();
-                       Field[] fields = clazz.getFields();
-                       for (Field field : fields) {
-                               if (field.getType().equals(ConfigOption.class) 
&& field.getAnnotation(Deprecated.class) == null) {
-                                       configOptions.add((ConfigOption) 
field.get(null));
-                               }
-                       }
-
-                       return configOptions;
-               } catch (Exception e) {
-                       throw new RuntimeException("Failed to extract config 
options from class " + clazz + ".", e);
-               }
-       }
-
-
-       /**
-        * Transforms this configuration group into HTML formatted table.
-        * Options are sorted alphabetically by key.
-        *
-        * @param options list of options to include in this group
-        * @return string containing HTML formatted table
-        */
-       private static String toHtmlTable(final List<ConfigOption> options) {
-               StringBuilder htmlTable = new StringBuilder(
-                       "<table class=\"table table-bordered\"><thead><tr><th 
class=\"text-left\" style=\"width: 20%\">Key</th>" +
-                       "<th class=\"text-left\" style=\"width: 15%\">Default 
Value</th><th class=\"text-left\" " +
-                       "style=\"width: 
65%\">Description</th></tr></thead><tbody>");
-
-               for (ConfigOption option : options) {
-                       htmlTable.append(toHtmlString(option));
-               }
-
-               htmlTable.append("</tbody></table>");
-
-               return htmlTable.toString();
-       }
-
-       /**
-        * Transforms option to table row.
-        *
-        * @param option option to transform
-        * @return row with the option description
-        */
-       private static String toHtmlString(final ConfigOption<?> option) {
-               Object defaultValue = option.defaultValue();
-               // This is a temporary hack that should be removed once 
FLINK-6490 is resolved.
-               // These options use System.getProperty("java.io.tmpdir") as 
the default.
-               // As a result the generated table contains an actual path as 
the default, which is simply wrong.
-               if (option == WebOptions.TMP_DIR || 
option.key().equals("python.dc.tmp.dir")) {
-                       defaultValue = null;
-               }
-               return "<tr>" +
-                       "<td><h5>" + escapeCharacters(option.key()) + 
"</h5></td>" +
-                       "<td>" + 
escapeCharacters(defaultValueToHtml(defaultValue)) + "</td>" +
-                       "<td>" + escapeCharacters(option.description()) + 
"</td>" +
-                       "</tr>";
-       }
-
-       private static String defaultValueToHtml(Object value) {
-               if (value instanceof String) {
-                       if (((String) value).isEmpty()) {
-                               return "(none)";
-                       }
-                       return "\"" + value + "\"";
-               }
-
-               return value == null ? "(none)" : value.toString();
-       }
-
-       private static String escapeCharacters(String value) {
-               return value
-                       .replaceAll("<", "&#60;")
-                       .replaceAll(">", "&#62;");
-       }
-
-       private static void sortOptions(List<ConfigOption> configOptions) {
-               Collections.sort(configOptions, new Comparator<ConfigOption>() {
-                       @Override
-                       public int compare(ConfigOption o1, ConfigOption o2) {
-                               return o1.key().compareTo(o2.key());
-                       }
-               });
-       }
-
-       /**
-        * Data structure used to assign {@link ConfigOption ConfigOptions} to 
the {@link ConfigGroup} with the longest
-        * matching prefix.
-        */
-       private static class Tree {
-               private final Node root = new Node();
-
-               Tree(ConfigGroup[] groups, Collection<ConfigOption> options) {
-                       // generate a tree based on all key prefixes
-                       for (ConfigGroup group : groups) {
-                               String[] keyComponents = 
group.keyPrefix().split("\\.");
-                               Node currentNode = root;
-                               for (String keyComponent : keyComponents) {
-                                       currentNode = 
currentNode.addChild(keyComponent);
-                               }
-                               currentNode.markAsGroupRoot();
-                       }
-
-                       // assign options to their corresponding group, i.e. 
the last group root node encountered when traversing
-                       // the tree based on the option key
-                       for (ConfigOption<?> option : options) {
-                               
findGroupRoot(option.key()).assignOption(option);
-                       }
-               }
-
-               List<ConfigOption> findConfigOptions(ConfigGroup configGroup) {
-                       Node groupRoot = findGroupRoot(configGroup.keyPrefix());
-                       return groupRoot.getConfigOptions();
-               }
-
-               List<ConfigOption> getDefaultOptions() {
-                       return root.getConfigOptions();
-               }
-
-               private Node findGroupRoot(String key) {
-                       String[] keyComponents = key.split("\\.");
-                       Node currentNode = root;
-                       for (String keyComponent : keyComponents) {
-                               currentNode = 
currentNode.findChild(keyComponent);
-                       }
-                       return currentNode.isGroupRoot() ? currentNode : root;
-               }
-
-               private static class Node {
-                       private final List<ConfigOption> configOptions = new 
ArrayList<>();
-                       private final Map<String, Node> children = new 
HashMap<>();
-                       private boolean isGroupRoot = false;
-
-                       private Node addChild(String keyComponent) {
-                               Node child = children.get(keyComponent);
-                               if (child == null) {
-                                       child = new Node();
-                                       children.put(keyComponent, child);
-                               }
-                               return child;
-                       }
-
-                       private Node findChild(String keyComponent) {
-                               Node child = children.get(keyComponent);
-                               if (child == null) {
-                                       return this;
-                               }
-                               return child;
-                       }
-
-                       private void assignOption(ConfigOption option) {
-                               configOptions.add(option);
-                       }
-
-                       private boolean isGroupRoot() {
-                               return isGroupRoot;
-                       }
-
-                       private void markAsGroupRoot() {
-                               this.isGroupRoot = true;
-                       }
-
-                       private List<ConfigOption> getConfigOptions() {
-                               return configOptions;
-                       }
-               }
-       }
-
-       private ConfigOptionsDocGenerator() {
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
index c7c6933..a16fd7f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java
@@ -33,35 +33,43 @@ public class HistoryServerOptions {
         */
        public static final ConfigOption<Long> 
HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL =
                key("historyserver.archive.fs.refresh-interval")
-                       .defaultValue(10000L);
+                       .defaultValue(10000L)
+                       .withDescription("Interval in milliseconds for 
refreshing the archived job directories.");;
 
        /**
         * Comma-separated list of directories which the HistoryServer polls 
for new archives.
         */
        public static final ConfigOption<String> HISTORY_SERVER_ARCHIVE_DIRS =
                key("historyserver.archive.fs.dir")
-                       .noDefaultValue();
+                       .noDefaultValue()
+                       .withDescription("Comma separated list of directories 
to fetch archived jobs from. The history server will" +
+                               " monitor these directories for archived jobs. 
You can configure the JobManager to archive jobs to a" +
+                               " directory via `jobmanager.archive.fs.dir`.");
 
        /**
         * The local directory used by the HistoryServer web-frontend.
         */
        public static final ConfigOption<String> HISTORY_SERVER_WEB_DIR =
                key("historyserver.web.tmpdir")
-                       .noDefaultValue();
+                       .noDefaultValue()
+                       .withDescription("This configuration parameter allows 
defining the Flink web directory to be used by the" +
+                               " history server web interface. The web 
interface will copy its static files into the directory.");
 
        /**
         * The address under which the HistoryServer web-frontend is accessible.
         */
        public static final ConfigOption<String> HISTORY_SERVER_WEB_ADDRESS =
                key("historyserver.web.address")
-                       .noDefaultValue();
+                       .noDefaultValue()
+                       .withDescription("Address of the HistoryServer's web 
interface.");
 
        /**
         * The port under which the HistoryServer web-frontend is accessible.
         */
        public static final ConfigOption<Integer> HISTORY_SERVER_WEB_PORT =
                key("historyserver.web.port")
-                       .defaultValue(8082);
+                       .defaultValue(8082)
+                       .withDescription("Port of the HistoryServers's web 
interface.");
 
        /**
         * The refresh interval for the HistoryServer web-frontend in 
milliseconds.
@@ -76,7 +84,9 @@ public class HistoryServerOptions {
         */
        public static final ConfigOption<Boolean> 
HISTORY_SERVER_WEB_SSL_ENABLED =
                key("historyserver.web.ssl.enabled")
-                       .defaultValue(false);
+                       .defaultValue(false)
+                       .withDescription("Enable HTTPs access to the 
HistoryServer web frontend. This is applicable only when the" +
+                               " global SSL flag security.ssl.enabled is set 
to true.");
 
        private HistoryServerOptions() {
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 24655fe..3b11645 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -47,6 +47,22 @@ public class MetricOptions {
                key("metrics.reporters")
                        .noDefaultValue();
 
+       public static final ConfigOption<String> REPORTER_CLASS =
+               key("metrics.reporter.<name>.class")
+                       .noDefaultValue()
+                       .withDescription("The reporter class to use for the 
reporter named <name>.");
+
+       public static final ConfigOption<String> REPORTER_INTERVAL =
+               key("metrics.reporter.<name>.interval")
+                       .noDefaultValue()
+                       .withDescription("The reporter interval to use for the 
reporter named <name>.");
+
+       public static final ConfigOption<String> REPORTER_CONFIG_PARAMETER =
+               key("metrics.reporter.<name>.<parameter>")
+                       .noDefaultValue()
+                       .withDescription("Configures the parameter <parameter> 
for the reporter named <name>.");
+
+
        /** The delimiter used to assemble the metric identifier. */
        public static final ConfigOption<String> SCOPE_DELIMITER =
                key("metrics.scope.delimiter")
@@ -55,37 +71,44 @@ public class MetricOptions {
        /** The scope format string that is applied to all metrics scoped to a 
JobManager. */
        public static final ConfigOption<String> SCOPE_NAMING_JM =
                key("metrics.scope.jm")
-                       .defaultValue("<host>.jobmanager");
+                       .defaultValue("<host>.jobmanager")
+                       .withDescription("Defines the scope format string that 
is applied to all metrics scoped to a JobManager.");
 
        /** The scope format string that is applied to all metrics scoped to a 
TaskManager. */
        public static final ConfigOption<String> SCOPE_NAMING_TM =
                key("metrics.scope.tm")
-                       .defaultValue("<host>.taskmanager.<tm_id>");
+                       .defaultValue("<host>.taskmanager.<tm_id>")
+                       .withDescription("Defines the scope format string that 
is applied to all metrics scoped to a TaskManager.");
 
        /** The scope format string that is applied to all metrics scoped to a 
job on a JobManager. */
        public static final ConfigOption<String> SCOPE_NAMING_JM_JOB =
                key("metrics.scope.jm.job")
-                       .defaultValue("<host>.jobmanager.<job_name>");
+                       .defaultValue("<host>.jobmanager.<job_name>")
+                       .withDescription("Defines the scope format string that 
is applied to all metrics scoped to a job on a JobManager.");
 
        /** The scope format string that is applied to all metrics scoped to a 
job on a TaskManager. */
        public static final ConfigOption<String> SCOPE_NAMING_TM_JOB =
                key("metrics.scope.tm.job")
-                       .defaultValue("<host>.taskmanager.<tm_id>.<job_name>");
+                       .defaultValue("<host>.taskmanager.<tm_id>.<job_name>")
+                       .withDescription("Defines the scope format string that 
is applied to all metrics scoped to a job on a TaskManager.");
 
        /** The scope format string that is applied to all metrics scoped to a 
task. */
        public static final ConfigOption<String> SCOPE_NAMING_TASK =
                key("metrics.scope.task")
-                       
.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>");
+                       
.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>")
+                       .withDescription("Defines the scope format string that 
is applied to all metrics scoped to a task.");
 
        /** The scope format string that is applied to all metrics scoped to an 
operator. */
        public static final ConfigOption<String> SCOPE_NAMING_OPERATOR =
                key("metrics.scope.operator")
-                       
.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>");
+                       
.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>")
+                       .withDescription("Defines the scope format string that 
is applied to all metrics scoped to an operator.");
 
        /** The number of measured latencies to maintain at each operator. */
        public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
                key("metrics.latency.history-size")
-                       .defaultValue(128);
+                       .defaultValue(128)
+                       .withDescription("Defines the number of measured 
latencies to maintain at each operator.");
 
        private MetricOptions() {
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index ac88bed..6a80419 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -45,24 +45,29 @@ public class QueryableStateOptions {
         * <ol>
         *     <li>a port: "9123",
         *     <li>a range of ports: "50100-50200", or
-        *     <li>a list of ranges and or points: 
"50100-50200,50300-50400,51234"
+        *     <li>a list of ranges and ports: "50100-50200,50300-50400,51234"
         * </ol>
         *
         * <p><b>The default port is 9069.</b>
         */
        public static final ConfigOption<String> PROXY_PORT_RANGE =
-                       key("query.proxy.ports")
-                       .defaultValue("9069");
+               key("query.proxy.ports")
+                       .defaultValue("9069")
+                       .withDescription("The port range of the queryable state 
proxy. The specified range can be a single " +
+                               "port: \"9123\", a range of ports: 
\"50100-50200\", " +
+                               "or a list of ranges and ports: 
\"50100-50200,50300-50400,51234\".");
 
        /** Number of network (event loop) threads for the client proxy (0 => 
#slots). */
        public static final ConfigOption<Integer> PROXY_NETWORK_THREADS =
-                       key("query.proxy.network-threads")
-                                       .defaultValue(0);
+               key("query.proxy.network-threads")
+                       .defaultValue(0)
+                       .withDescription("Number of network (Netty's event 
loop) Threads for queryable state proxy.");
 
        /** Number of async query threads for the client proxy (0 => #slots). */
        public static final ConfigOption<Integer> PROXY_ASYNC_QUERY_THREADS =
-                       key("query.proxy.query-threads")
-                                       .defaultValue(0);
+               key("query.proxy.query-threads")
+                       .defaultValue(0)
+                       .withDescription("Number of query Threads for queryable 
state proxy. Uses the number of slots if set to 0.");
 
        /**
         * The config parameter defining the server port range of the queryable 
state server.
@@ -77,24 +82,29 @@ public class QueryableStateOptions {
         * <ol>
         *     <li>a port: "9123",
         *     <li>a range of ports: "50100-50200", or
-        *     <li>a list of ranges and or points: 
"50100-50200,50300-50400,51234"
+        *     <li>a list of ranges and ports: "50100-50200,50300-50400,51234"
         * </ol>
         *
         * <p><b>The default port is 9067.</b>
         */
        public static final ConfigOption<String> SERVER_PORT_RANGE =
-                       key("query.server.ports")
-                       .defaultValue("9067");
+               key("query.server.ports")
+                       .defaultValue("9067")
+                       .withDescription("The port range of the queryable state 
server. The specified range can be a single " +
+                               "port: \"9123\", a range of ports: 
\"50100-50200\", " +
+                               "or a list of ranges and ports: 
\"50100-50200,50300-50400,51234\".");
 
        /** Number of network (event loop) threads for the KvState server (0 => 
#slots). */
        public static final ConfigOption<Integer> SERVER_NETWORK_THREADS =
-                       key("query.server.network-threads")
-                       .defaultValue(0);
+               key("query.server.network-threads")
+                       .defaultValue(0)
+                       .withDescription("Number of network (Netty's event 
loop) Threads for queryable state server.");
 
        /** Number of async query threads for the KvStateServerHandler (0 => 
#slots). */
        public static final ConfigOption<Integer> SERVER_ASYNC_QUERY_THREADS =
-                       key("query.server.query-threads")
-                       .defaultValue(0);
+               key("query.server.query-threads")
+                       .defaultValue(0)
+                       .withDescription("Number of query Threads for queryable 
state server. Uses the number of slots if set to 0.");
 
        // 
------------------------------------------------------------------------
        // Client Options
@@ -102,8 +112,9 @@ public class QueryableStateOptions {
 
        /** Number of network (event loop) threads for the KvState client (0 => 
Use number of available cores). */
        public static final ConfigOption<Integer> CLIENT_NETWORK_THREADS =
-                       key("query.client.network-threads")
-                       .defaultValue(0);
+               key("query.client.network-threads")
+                       .defaultValue(0)
+                       .withDescription("Number of network (Netty's event 
loop) Threads for queryable state client.");
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 2c353d8..a2e0d1b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -26,6 +26,10 @@ import static 
org.apache.flink.configuration.ConfigOptions.key;
  * The set of configuration options relating to security.
  */
 @PublicEvolving
+@ConfigGroups(groups = {
+       @ConfigGroup(name = "Kerberos", keyPrefix = "security.kerberos"),
+       @ConfigGroup(name = "ZooKeeper", keyPrefix = "zookeeper")
+})
 public class SecurityOptions {
 
        // 
------------------------------------------------------------------------
@@ -35,20 +39,26 @@ public class SecurityOptions {
        public static final ConfigOption<String> KERBEROS_LOGIN_PRINCIPAL =
                key("security.kerberos.login.principal")
                        .noDefaultValue()
-                       .withDeprecatedKeys("security.principal");
+                       .withDeprecatedKeys("security.principal")
+                       .withDescription("Kerberos principal name associated 
with the keytab.");
 
        public static final ConfigOption<String> KERBEROS_LOGIN_KEYTAB =
                key("security.kerberos.login.keytab")
                        .noDefaultValue()
-                       .withDeprecatedKeys("security.keytab");
+                       .withDeprecatedKeys("security.keytab")
+                       .withDescription("Absolute path to a Kerberos keytab 
file that contains the user credentials.");
 
        public static final ConfigOption<Boolean> KERBEROS_LOGIN_USETICKETCACHE 
=
                key("security.kerberos.login.use-ticket-cache")
-                       .defaultValue(true);
+                       .defaultValue(true)
+                       .withDescription("Indicates whether to read from your 
Kerberos ticket cache.");
 
        public static final ConfigOption<String> KERBEROS_LOGIN_CONTEXTS =
                key("security.kerberos.login.contexts")
-                       .noDefaultValue();
+                       .noDefaultValue()
+                       .withDescription("A comma-separated list of login 
contexts to provide the Kerberos credentials to" +
+                               " (for example, `Client,KafkaClient` to use the 
credentials for ZooKeeper authentication and for" +
+                               " Kafka authentication)");
 
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-core/src/test/java/org/apache/flink/configuration/ConfigDocsCompletenessChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigDocsCompletenessChecker.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigDocsCompletenessChecker.java
deleted file mode 100644
index 3db85e7..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigDocsCompletenessChecker.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.configuration;
-
-import org.apache.commons.io.FileUtils;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-/**
- * A small utility that collects all config keys that are not described in
- * the configuration reference documentation.
- */
-public class ConfigDocsCompletenessChecker {
-
-       public static void main(String[] args) throws Exception {
-
-               String configFileContents = FileUtils.readFileToString(new 
File("docs/setup/config.md"));
-               Field[] fields = ConfigConstants.class.getFields();
-
-               for (Field field : fields) {
-                       if (Modifier.isStatic(field.getModifiers()) && 
field.getType().equals(String.class) &&
-                                       !field.getName().startsWith("DEFAULT")) 
{
-
-                               Object val = field.get(null);
-                               if (!configFileContents.contains((String) val)) 
{
-                                       System.out.println("++++ " + val + " is 
not mentioned in the configuration file!!!");
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-core/src/test/java/org/apache/flink/configuration/ConfigOptionsDocGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigOptionsDocGeneratorTest.java
 
b/flink-core/src/test/java/org/apache/flink/configuration/ConfigOptionsDocGeneratorTest.java
deleted file mode 100644
index 0959f15..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/configuration/ConfigOptionsDocGeneratorTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.configuration;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the ConfigOptionsDocGenerator.
- */
-@SuppressWarnings("unused")
-public class ConfigOptionsDocGeneratorTest {
-
-       static class TestConfigGroup {
-               public static ConfigOption<Integer> firstOption = ConfigOptions
-                       .key("first.option.a")
-                       .defaultValue(2)
-                       .withDescription("This is example description for the 
first option.");
-
-               public static ConfigOption<String> secondOption = ConfigOptions
-                       .key("second.option.a")
-                       .noDefaultValue()
-                       .withDescription("This is long example description for 
the second option.");
-       }
-
-       @Test
-       public void testCreatingDescription() throws Exception {
-               final String expectedTable = "<table class=\"table 
table-bordered\">" +
-                       "<thead>" +
-                       "<tr>" +
-                       "<th class=\"text-left\" style=\"width: 20%\">Key</th>" 
+
-                       "<th class=\"text-left\" style=\"width: 15%\">Default 
Value</th>" +
-                       "<th class=\"text-left\" style=\"width: 
65%\">Description</th>" +
-                       "</tr>" +
-                       "</thead>" +
-                       "<tbody>" +
-                       "<tr>" +
-                       "<td><h5>first.option.a</h5></td>" +
-                       "<td>2</td>" +
-                       "<td>This is example description for the first 
option.</td>" +
-                       "</tr>" +
-                       "<tr>" +
-                       "<td><h5>second.option.a</h5></td>" +
-                       "<td>(none)</td>" +
-                       "<td>This is long example description for the second 
option.</td>" +
-                       "</tr>" +
-                       "</tbody>" +
-                       "</table>";
-               final String htmlTable = 
ConfigOptionsDocGenerator.generateTablesForClass(TestConfigGroup.class).get(0).f1;
-
-               assertEquals(expectedTable, htmlTable);
-       }
-
-       @ConfigGroups(groups = {
-       @ConfigGroup(name = "firstGroup", keyPrefix = "first"),
-       @ConfigGroup(name = "secondGroup", keyPrefix = "second")})
-       static class TestConfigMultipleSubGroup {
-               public static ConfigOption<Integer> firstOption = ConfigOptions
-                       .key("first.option.a")
-                       .defaultValue(2)
-                       .withDescription("This is example description for the 
first option.");
-
-               public static ConfigOption<String> secondOption = ConfigOptions
-                       .key("second.option.a")
-                       .noDefaultValue()
-                       .withDescription("This is long example description for 
the second option.");
-
-               public static ConfigOption<Integer> thirdOption = ConfigOptions
-                       .key("third.option.a")
-                       .defaultValue(2)
-                       .withDescription("This is example description for the 
third option.");
-
-               public static ConfigOption<String> fourthOption = ConfigOptions
-                       .key("fourth.option.a")
-                       .noDefaultValue()
-                       .withDescription("This is long example description for 
the fourth option.");
-       }
-
-       @Test
-       public void testCreatingMultipleGroups() throws Exception {
-               final List<Tuple2<ConfigGroup, String>> tables = 
ConfigOptionsDocGenerator.generateTablesForClass(
-                       TestConfigMultipleSubGroup.class);
-
-               assertEquals(tables.size(), 3);
-               final HashMap<String, String> tablesConverted = new HashMap<>();
-               for (Tuple2<ConfigGroup, String> table : tables) {
-                       tablesConverted.put(table.f0 != null ? table.f0.name() 
: "default", table.f1);
-               }
-
-               assertEquals("<table class=\"table table-bordered\">" +
-                               "<thead>" +
-                               "<tr>" +
-                               "<th class=\"text-left\" style=\"width: 
20%\">Key</th>" +
-                               "<th class=\"text-left\" style=\"width: 
15%\">Default Value</th>" +
-                               "<th class=\"text-left\" style=\"width: 
65%\">Description</th>" +
-                               "</tr>" +
-                               "</thead>" +
-                               "<tbody>" +
-                               "<tr>" +
-                               "<td><h5>first.option.a</h5></td>" +
-                               "<td>2</td>" +
-                               "<td>This is example description for the first 
option.</td>" +
-                               "</tr>" +
-                               "</tbody>" +
-                               "</table>", tablesConverted.get("firstGroup"));
-               assertEquals("<table class=\"table table-bordered\">" +
-                               "<thead>" +
-                               "<tr>" +
-                               "<th class=\"text-left\" style=\"width: 
20%\">Key</th>" +
-                               "<th class=\"text-left\" style=\"width: 
15%\">Default Value</th>" +
-                               "<th class=\"text-left\" style=\"width: 
65%\">Description</th>" +
-                               "</tr>" +
-                               "</thead>" +
-                               "<tbody>" +
-                               "<tr>" +
-                               "<td><h5>second.option.a</h5></td>" +
-                               "<td>(none)</td>" +
-                               "<td>This is long example description for the 
second option.</td>" +
-                               "</tr>" +
-                               "</tbody>" +
-                               "</table>", tablesConverted.get("secondGroup"));
-               assertEquals("<table class=\"table table-bordered\">" +
-                               "<thead>" +
-                               "<tr>" +
-                               "<th class=\"text-left\" style=\"width: 
20%\">Key</th>" +
-                               "<th class=\"text-left\" style=\"width: 
15%\">Default Value</th>" +
-                               "<th class=\"text-left\" style=\"width: 
65%\">Description</th>" +
-                               "</tr>" +
-                               "</thead>" +
-                               "<tbody>" +
-                               "<tr>" +
-                               "<td><h5>fourth.option.a</h5></td>" +
-                               "<td>(none)</td>" +
-                               "<td>This is long example description for the 
fourth option.</td>" +
-                               "</tr>" +
-                               "<tr>" +
-                               "<td><h5>third.option.a</h5></td>" +
-                               "<td>2</td>" +
-                               "<td>This is example description for the third 
option.</td>" +
-                               "</tr>" +
-                               "</tbody>" +
-                               "</table>", tablesConverted.get("default"));
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-docs/README.md
----------------------------------------------------------------------
diff --git a/flink-docs/README.md b/flink-docs/README.md
index 974417f..b6de6f1 100644
--- a/flink-docs/README.md
+++ b/flink-docs/README.md
@@ -33,4 +33,17 @@ To integrate a new endpoint into the generator
 
 The documentation must be regenerated whenever
 * a handler is added to/removed from a `RestServerEndpoint`
-* any used `MessageHeaders` class or any referenced `RequestBody`, 
`ResponseBody`, `MessageParameters` or `MessageParameter` class is modified.
\ No newline at end of file
+* any used `MessageHeaders` class or any referenced `RequestBody`, 
`ResponseBody`, `MessageParameters` or `MessageParameter` class is modified.
+
+## Configuration documentation
+
+The `ConfigOptionsDocGenerator` can be use to generate a reference of 
`ConfigOptions`. By default, a separate file is generated for each `*Options` 
class found in `org.apache.flink.configuration` and 
`org.apache.flink.yarn.configuration`. The `@ConfigGroups` annotation can be 
used to generate multiple files from a single class.
+
+To integrate an `*Options` class from another package, add another 
module-package argument pair to the `maven-antrun-plugin` configuration in the 
`generate-config-docs` profile.
+
+The files can be generated by running `mvn package -Dgenerate-config-docs`, 
and can be integrated into the documentation using `{% include 
generated/<file-name>.html %}`.
+
+The documentation must be regenerated whenever
+* an `*Options` class was added or removed
+* a `ConfigOption` was added to or removed from a `*Options` class
+* a `ConfigOption` was modified in any way.

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index d271009..68a442d 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -48,6 +48,11 @@ under the License.
                        
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
@@ -80,6 +85,14 @@ under the License.
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                </dependency>
+
+               <dependency>
+                       <!-- Required because the surefire plugin tries to load 
the Flip6 category annotation -->
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
@@ -91,6 +104,23 @@ under the License.
                                        <skip>true</skip>
                                </configuration>
                        </plugin>
+
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-enforcer-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <!-- Not important for this 
module as we only access Flink classes-->
+                                               <id>dependency-convergence</id>
+                                               <goals>
+                                                       <goal>enforce</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <skip>true</skip>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
 
@@ -128,6 +158,45 @@ under the License.
                                </plugins>
                        </build>
                </profile>
+               <profile>
+                       <id>generate-config-docs</id>
+                       <activation>
+                               <property>
+                                       <name>generate-config-docs</name>
+                               </property>
+                       </activation>
+                       <build>
+                               <plugins>
+                                       <plugin>
+                                               
<artifactId>maven-antrun-plugin</artifactId>
+                                               <version>1.7</version>
+                                               <executions>
+                                                       <execution>
+                                                               
<phase>package</phase>
+                                                               <goals>
+                                                                       
<goal>run</goal>
+                                                               </goals>
+                                                       </execution>
+                                               </executions>
+                                               <configuration>
+                                                       <target>
+                                                               <mkdir 
dir="${rootDir}/${generated.docs.dir}"/>
+                                                               <java 
classname="org.apache.flink.docs.configuration.ConfigOptionsDocGenerator" 
fork="true">
+                                                                       
<classpath refid="maven.compile.classpath" />
+                                                                       <arg 
value="${rootDir}/${generated.docs.dir}/" />
+                                                                       <arg 
value="${rootDir}" />
+                                                                       
<!--packages with configuration classes-->
+                                                                       <arg 
value="flink-core" />
+                                                                       <arg 
value="org.apache.flink.configuration" />
+                                                                       <arg 
value="flink-yarn" />
+                                                                       <arg 
value="org.apache.flink.yarn.configuration" />
+                                                               </java>
+                                                       </target>
+                                               </configuration>
+                                       </plugin>
+                               </plugins>
+                       </build>
+               </profile>
        </profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
 
b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
new file mode 100644
index 0000000..7b8cdaf
--- /dev/null
+++ 
b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.docs.configuration;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigGroup;
+import org.apache.flink.configuration.ConfigGroups;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.WebOptions;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Class used for generating code based documentation of configuration 
parameters.
+ */
+public class ConfigOptionsDocGenerator {
+
+       /**
+        * This method generates html tables from set of classes containing 
{@link ConfigOption ConfigOptions}.
+        *
+        * <p>For each class 1 or more html tables will be generated and placed 
into a separate file, depending on whether
+        * the class is annotated with {@link ConfigGroups}. The tables contain 
the key, default value and description for
+        * every {@link ConfigOption}.
+        *
+        * @param args
+        *  [0] output directory for the generated files
+        *  [1] project root directory
+        *  [x] module containing an *Options class
+        *  [x+1] package to the * Options.classes
+        */
+       public static void main(String[] args) throws IOException, 
ClassNotFoundException {
+               String outputDirectory = args[0];
+               String rootDir = args[1];
+               for (int x = 2; x + 1 < args.length; x += 2) {
+                       createTable(rootDir, args[x], args[x + 1], 
outputDirectory);
+               }
+       }
+
+       private static void createTable(String rootDir, String module, String 
packageName, String outputDirectory) throws IOException, ClassNotFoundException 
{
+               Path configDir = Paths.get(rootDir, module, "src/main/java", 
packageName.replaceAll("\\.", "/"));
+
+               Pattern p = Pattern.compile("(([a-zA-Z]*)(Options))\\.java");
+               try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(configDir, "*Options.java")) {
+                       for (Path entry : stream) {
+                               String fileName = 
entry.getFileName().toString();
+                               Matcher matcher = p.matcher(fileName);
+                               if (!fileName.equals("ConfigOptions.java") && 
matcher.matches()) {
+                                       Class<?> optionsClass = 
Class.forName(packageName + "." + matcher.group(1));
+                                       List<Tuple2<ConfigGroup, String>> 
tables = generateTablesForClass(optionsClass);
+                                       if (tables.size() > 0) {
+                                               for (Tuple2<ConfigGroup, 
String> group : tables) {
+
+                                                       String name = group.f0 
== null
+                                                               ? 
matcher.group(2).replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase()
+                                                               : 
group.f0.name().replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase();
+
+                                                       String outputFile = 
name + "_configuration.html";
+                                                       
Files.write(Paths.get(outputDirectory, outputFile), 
group.f1.getBytes(StandardCharsets.UTF_8));
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+
+       @VisibleForTesting
+       static List<Tuple2<ConfigGroup, String>> 
generateTablesForClass(Class<?> optionsClass) {
+               ConfigGroups configGroups = 
optionsClass.getAnnotation(ConfigGroups.class);
+               List<Tuple2<ConfigGroup, String>> tables = new ArrayList<>();
+               List<ConfigOption> allOptions = 
extractConfigOptions(optionsClass);
+
+               if (configGroups != null) {
+                       Tree tree = new Tree(configGroups.groups(), allOptions);
+
+                       for (ConfigGroup group : configGroups.groups()) {
+                               List<ConfigOption> configOptions = 
tree.findConfigOptions(group);
+                               sortOptions(configOptions);
+                               tables.add(Tuple2.of(group, 
toHtmlTable(configOptions)));
+                       }
+                       List<ConfigOption> configOptions = 
tree.getDefaultOptions();
+                       sortOptions(configOptions);
+                       tables.add(Tuple2.of(null, toHtmlTable(configOptions)));
+               } else {
+                       sortOptions(allOptions);
+                       tables.add(Tuple2.of(null, toHtmlTable(allOptions)));
+               }
+               return tables;
+       }
+
+       private static List<ConfigOption> extractConfigOptions(Class<?> clazz) {
+               try {
+                       List<ConfigOption> configOptions = new ArrayList<>();
+                       Field[] fields = clazz.getFields();
+                       for (Field field : fields) {
+                               if (field.getType().equals(ConfigOption.class) 
&& field.getAnnotation(Deprecated.class) == null) {
+                                       configOptions.add((ConfigOption) 
field.get(null));
+                               }
+                       }
+
+                       return configOptions;
+               } catch (Exception e) {
+                       throw new RuntimeException("Failed to extract config 
options from class " + clazz + ".", e);
+               }
+       }
+
+       /**
+        * Transforms this configuration group into HTML formatted table.
+        * Options are sorted alphabetically by key.
+        *
+        * @param options list of options to include in this group
+        * @return string containing HTML formatted table
+        */
+       private static String toHtmlTable(final List<ConfigOption> options) {
+               StringBuilder htmlTable = new StringBuilder();
+               htmlTable.append("<table class=\"table table-bordered\">\n");
+               htmlTable.append("    <thead>\n");
+               htmlTable.append("        <tr>\n");
+               htmlTable.append("            <th class=\"text-left\" 
style=\"width: 20%\">Key</th>\n");
+               htmlTable.append("            <th class=\"text-left\" 
style=\"width: 15%\">Default Value</th>\n");
+               htmlTable.append("            <th class=\"text-left\" 
style=\"width: 65%\">Description</th>\n");
+               htmlTable.append("        </tr>\n");
+               htmlTable.append("    </thead>\n");
+               htmlTable.append("    <tbody>\n");
+
+               for (ConfigOption option : options) {
+                       htmlTable.append(toHtmlString(option));
+               }
+
+               htmlTable.append("    </tbody>\n");
+               htmlTable.append("</table>\n");
+
+               return htmlTable.toString();
+       }
+
+       /**
+        * Transforms option to table row.
+        *
+        * @param option option to transform
+        * @return row with the option description
+        */
+       private static String toHtmlString(final ConfigOption<?> option) {
+               Object defaultValue = option.defaultValue();
+               // This is a temporary hack that should be removed once 
FLINK-6490 is resolved.
+               // These options use System.getProperty("java.io.tmpdir") as 
the default.
+               // As a result the generated table contains an actual path as 
the default, which is simply wrong.
+               if (option == WebOptions.TMP_DIR || 
option.key().equals("python.dc.tmp.dir")) {
+                       defaultValue = null;
+               }
+               return "" +
+                       "        <tr>\n" +
+                       "            <td><h5>" + escapeCharacters(option.key()) 
+ "</h5></td>\n" +
+                       "            <td>" + 
escapeCharacters(defaultValueToHtml(defaultValue)) + "</td>\n" +
+                       "            <td>" + 
escapeCharacters(option.description()) + "</td>\n" +
+                       "        </tr>\n";
+       }
+
+       private static String defaultValueToHtml(Object value) {
+               if (value instanceof String) {
+                       if (((String) value).isEmpty()) {
+                               return "(none)";
+                       }
+                       return "\"" + value + "\"";
+               }
+
+               return value == null ? "(none)" : value.toString();
+       }
+
+       private static String escapeCharacters(String value) {
+               return value
+                       .replaceAll("<", "&#60;")
+                       .replaceAll(">", "&#62;");
+       }
+
+       private static void sortOptions(List<ConfigOption> configOptions) {
+               configOptions.sort(Comparator.comparing(ConfigOption::key));
+       }
+
+       /**
+        * Data structure used to assign {@link ConfigOption ConfigOptions} to 
the {@link ConfigGroup} with the longest
+        * matching prefix.
+        */
+       private static class Tree {
+               private final Node root = new Node();
+
+               Tree(ConfigGroup[] groups, Collection<ConfigOption> options) {
+                       // generate a tree based on all key prefixes
+                       for (ConfigGroup group : groups) {
+                               String[] keyComponents = 
group.keyPrefix().split("\\.");
+                               Node currentNode = root;
+                               for (String keyComponent : keyComponents) {
+                                       currentNode = 
currentNode.addChild(keyComponent);
+                               }
+                               currentNode.markAsGroupRoot();
+                       }
+
+                       // assign options to their corresponding group, i.e. 
the last group root node encountered when traversing
+                       // the tree based on the option key
+                       for (ConfigOption<?> option : options) {
+                               
findGroupRoot(option.key()).assignOption(option);
+                       }
+               }
+
+               List<ConfigOption> findConfigOptions(ConfigGroup configGroup) {
+                       Node groupRoot = findGroupRoot(configGroup.keyPrefix());
+                       return groupRoot.getConfigOptions();
+               }
+
+               List<ConfigOption> getDefaultOptions() {
+                       return root.getConfigOptions();
+               }
+
+               private Node findGroupRoot(String key) {
+                       String[] keyComponents = key.split("\\.");
+                       Node currentNode = root;
+                       for (String keyComponent : keyComponents) {
+                               currentNode = 
currentNode.findChild(keyComponent);
+                       }
+                       return currentNode.isGroupRoot() ? currentNode : root;
+               }
+
+               private static class Node {
+                       private final List<ConfigOption> configOptions = new 
ArrayList<>();
+                       private final Map<String, Node> children = new 
HashMap<>();
+                       private boolean isGroupRoot = false;
+
+                       private Node addChild(String keyComponent) {
+                               Node child = children.get(keyComponent);
+                               if (child == null) {
+                                       child = new Node();
+                                       children.put(keyComponent, child);
+                               }
+                               return child;
+                       }
+
+                       private Node findChild(String keyComponent) {
+                               Node child = children.get(keyComponent);
+                               if (child == null) {
+                                       return this;
+                               }
+                               return child;
+                       }
+
+                       private void assignOption(ConfigOption option) {
+                               configOptions.add(option);
+                       }
+
+                       private boolean isGroupRoot() {
+                               return isGroupRoot;
+                       }
+
+                       private void markAsGroupRoot() {
+                               this.isGroupRoot = true;
+                       }
+
+                       private List<ConfigOption> getConfigOptions() {
+                               return configOptions;
+                       }
+               }
+       }
+
+       private ConfigOptionsDocGenerator() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigDocsCompletenessChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigDocsCompletenessChecker.java
 
b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigDocsCompletenessChecker.java
new file mode 100644
index 0000000..f0816f3
--- /dev/null
+++ 
b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigDocsCompletenessChecker.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.docs.configuration;
+
+import org.apache.flink.configuration.ConfigConstants;
+
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+/**
+ * A small utility that collects all config keys that are not described in
+ * the configuration reference documentation.
+ */
+public class ConfigDocsCompletenessChecker {
+
+       public static void main(String[] args) throws Exception {
+
+               String configFileContents = FileUtils.readFileToString(new 
File("docs/setup/config.md"));
+               Field[] fields = ConfigConstants.class.getFields();
+
+               for (Field field : fields) {
+                       if (Modifier.isStatic(field.getModifiers()) && 
field.getType().equals(String.class) && !field.getName().startsWith("DEFAULT")) 
{
+                               Object val = field.get(null);
+                               if (!configFileContents.contains((String) val)) 
{
+                                       System.out.println("++++ " + val + " is 
not mentioned in the configuration file!!!");
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
 
b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
new file mode 100644
index 0000000..b7b2ba6
--- /dev/null
+++ 
b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.docs.configuration;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigGroup;
+import org.apache.flink.configuration.ConfigGroups;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link ConfigOptionsDocGenerator}.
+ */
+public class ConfigOptionsDocGeneratorTest {
+
+       static class TestConfigGroup {
+               public static ConfigOption<Integer> firstOption = ConfigOptions
+                       .key("first.option.a")
+                       .defaultValue(2)
+                       .withDescription("This is example description for the 
first option.");
+
+               public static ConfigOption<String> secondOption = ConfigOptions
+                       .key("second.option.a")
+                       .noDefaultValue()
+                       .withDescription("This is long example description for 
the second option.");
+       }
+
+       @Test
+       public void testCreatingDescription() {
+               final String expectedTable =
+                       "<table class=\"table table-bordered\">\n" +
+                       "    <thead>\n" +
+                       "        <tr>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
20%\">Key</th>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
15%\">Default Value</th>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
65%\">Description</th>\n" +
+                       "        </tr>\n" +
+                       "    </thead>\n" +
+                       "    <tbody>\n" +
+                       "        <tr>\n" +
+                       "            <td><h5>first.option.a</h5></td>\n" +
+                       "            <td>2</td>\n" +
+                       "            <td>This is example description for the 
first option.</td>\n" +
+                       "        </tr>\n" +
+                       "        <tr>\n" +
+                       "            <td><h5>second.option.a</h5></td>\n" +
+                       "            <td>(none)</td>\n" +
+                       "            <td>This is long example description for 
the second option.</td>\n" +
+                       "        </tr>\n" +
+                       "    </tbody>\n" +
+                       "</table>\n";
+               final String htmlTable = 
ConfigOptionsDocGenerator.generateTablesForClass(TestConfigGroup.class).get(0).f1;
+
+               assertEquals(expectedTable, htmlTable);
+       }
+
+       @ConfigGroups(groups = {
+               @ConfigGroup(name = "firstGroup", keyPrefix = "first"),
+               @ConfigGroup(name = "secondGroup", keyPrefix = "second")})
+       static class TestConfigMultipleSubGroup {
+               public static ConfigOption<Integer> firstOption = ConfigOptions
+                       .key("first.option.a")
+                       .defaultValue(2)
+                       .withDescription("This is example description for the 
first option.");
+
+               public static ConfigOption<String> secondOption = ConfigOptions
+                       .key("second.option.a")
+                       .noDefaultValue()
+                       .withDescription("This is long example description for 
the second option.");
+
+               public static ConfigOption<Integer> thirdOption = ConfigOptions
+                       .key("third.option.a")
+                       .defaultValue(2)
+                       .withDescription("This is example description for the 
third option.");
+
+               public static ConfigOption<String> fourthOption = ConfigOptions
+                       .key("fourth.option.a")
+                       .noDefaultValue()
+                       .withDescription("This is long example description for 
the fourth option.");
+       }
+
+       @Test
+       public void testCreatingMultipleGroups() {
+               final List<Tuple2<ConfigGroup, String>> tables = 
ConfigOptionsDocGenerator.generateTablesForClass(
+                       TestConfigMultipleSubGroup.class);
+
+               assertEquals(tables.size(), 3);
+               final HashMap<String, String> tablesConverted = new HashMap<>();
+               for (Tuple2<ConfigGroup, String> table : tables) {
+                       tablesConverted.put(table.f0 != null ? table.f0.name() 
: "default", table.f1);
+               }
+
+               assertEquals(
+                       "<table class=\"table table-bordered\">\n" +
+                       "    <thead>\n" +
+                       "        <tr>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
20%\">Key</th>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
15%\">Default Value</th>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
65%\">Description</th>\n" +
+                       "        </tr>\n" +
+                       "    </thead>\n" +
+                       "    <tbody>\n" +
+                       "        <tr>\n" +
+                       "            <td><h5>first.option.a</h5></td>\n" +
+                       "            <td>2</td>\n" +
+                       "            <td>This is example description for the 
first option.</td>\n" +
+                       "        </tr>\n" +
+                       "    </tbody>\n" +
+                       "</table>\n", tablesConverted.get("firstGroup"));
+               assertEquals(
+                       "<table class=\"table table-bordered\">\n" +
+                       "    <thead>\n" +
+                       "        <tr>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
20%\">Key</th>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
15%\">Default Value</th>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
65%\">Description</th>\n" +
+                       "        </tr>\n" +
+                       "    </thead>\n" +
+                       "    <tbody>\n" +
+                       "        <tr>\n" +
+                       "            <td><h5>second.option.a</h5></td>\n" +
+                       "            <td>(none)</td>\n" +
+                       "            <td>This is long example description for 
the second option.</td>\n" +
+                       "        </tr>\n" +
+                       "    </tbody>\n" +
+                       "</table>\n", tablesConverted.get("secondGroup"));
+               assertEquals(
+                       "<table class=\"table table-bordered\">\n" +
+                       "    <thead>\n" +
+                       "        <tr>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
20%\">Key</th>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
15%\">Default Value</th>\n" +
+                       "            <th class=\"text-left\" style=\"width: 
65%\">Description</th>\n" +
+                       "        </tr>\n" +
+                       "    </thead>\n" +
+                       "    <tbody>\n" +
+                       "        <tr>\n" +
+                       "            <td><h5>fourth.option.a</h5></td>\n" +
+                       "            <td>(none)</td>\n" +
+                       "            <td>This is long example description for 
the fourth option.</td>\n" +
+                       "        </tr>\n" +
+                       "        <tr>\n" +
+                       "            <td><h5>third.option.a</h5></td>\n" +
+                       "            <td>2</td>\n" +
+                       "            <td>This is example description for the 
third option.</td>\n" +
+                       "        </tr>\n" +
+                       "    </tbody>\n" +
+                       "</table>\n", tablesConverted.get("default"));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-libraries/flink-python/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/pom.xml 
b/flink-libraries/flink-python/pom.xml
index eaa5b87..a8a1d0b 100644
--- a/flink-libraries/flink-python/pom.xml
+++ b/flink-libraries/flink-python/pom.xml
@@ -105,7 +105,7 @@ under the License.
                                                <configuration>
                                                        <target>
                                                                <mkdir 
dir="${rootDir}/${generated.docs.dir}"/>
-                                                               <java 
classname="org.apache.flink.configuration.ConfigOptionsDocGenerator" 
fork="true">
+                                                               <java 
classname="org.apache.flink.docs.configuration.ConfigOptionsDocGenerator" 
fork="true">
                                                                        
<classpath refid="maven.compile.classpath" />
                                                                        <arg 
value="${rootDir}/${generated.docs.dir}/" />
                                                                        
<!--package with configuration classes-->

http://git-wip-us.apache.org/repos/asf/flink/blob/2a612d9d/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 9aaae02..aa5a00a 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -134,38 +134,6 @@ under the License.
 
        <profiles>
                <profile>
-                       <id>generate-config-docs</id>
-
-                       <build>
-                               <plugins>
-                                       <plugin>
-                                               
<artifactId>maven-antrun-plugin</artifactId>
-                                               <version>1.7</version>
-                                               <executions>
-                                                       <execution>
-                                                               
<phase>package</phase>
-                                                               <goals>
-                                                                       
<goal>run</goal>
-                                                               </goals>
-                                                       </execution>
-                                               </executions>
-                                               <configuration>
-                                                       <target>
-                                                               <mkdir 
dir="${rootDir}/${generated.docs.dir}"/>
-                                                               <java 
classname="org.apache.flink.configuration.ConfigOptionsDocGenerator" 
fork="true">
-                                                                       
<classpath refid="maven.compile.classpath" />
-                                                                       <arg 
value="${rootDir}/${generated.docs.dir}/" />
-                                                                       
<!--package with configuration classes-->
-                                                                       <arg 
value="org.apache.flink.yarn.configuration" />
-                                                               </java>
-                                                       </target>
-                                               </configuration>
-                                       </plugin>
-                               </plugins>
-                       </build>
-               </profile>
-
-               <profile>
                        <!-- Hadoop >= 2.6 moved the S3 file systems from 
hadoop-common into hadoop-aws artifact
                                (see 
https://issues.apache.org/jira/browse/HADOOP-11074)
                                We can add the (test) dependency per default 
once 2.6 is the minimum required version.

Reply via email to