[FLINK-7301] [docs] Rework state documentation

This closes #4441.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47070674
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47070674
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47070674

Branch: refs/heads/master
Commit: 470706740d5d89de69844a5662166ce94d71d00d
Parents: 2ed74ca
Author: twalthr <twal...@apache.org>
Authored: Mon Jul 31 20:14:31 2017 +0200
Committer: twalthr <twal...@apache.org>
Committed: Tue Aug 8 14:05:51 2017 +0200

----------------------------------------------------------------------
 docs/check_links.sh                           |   5 +-
 docs/concepts/programming-model.md            |   4 +-
 docs/concepts/runtime.md                      |  10 +-
 docs/dev/api_concepts.md                      |   4 +-
 docs/dev/batch/connectors.md                  |   2 +-
 docs/dev/batch/python.md                      |   2 +-
 docs/dev/cluster_execution.md                 |   2 +-
 docs/dev/connectors/index.md                  |   2 +-
 docs/dev/connectors/kafka.md                  |   2 +-
 docs/dev/datastream_api.md                    |   4 +-
 docs/dev/execution_configuration.md           |   2 +-
 docs/dev/libs/storm_compatibility.md          |   2 +-
 docs/dev/linking_with_flink.md                |   2 +-
 docs/dev/local_execution.md                   |   2 +-
 docs/dev/migration.md                         |   8 +-
 docs/dev/packaging.md                         |   2 +-
 docs/dev/parallel.md                          |   4 +-
 docs/dev/stream/checkpointing.md              | 174 -----
 docs/dev/stream/process_function.md           |   2 +-
 docs/dev/stream/queryable_state.md            | 291 --------
 docs/dev/stream/state.md                      | 768 ---------------------
 docs/dev/stream/state/checkpointing.md        | 174 +++++
 docs/dev/stream/state/custom_serialization.md | 188 +++++
 docs/dev/stream/state/index.md                |  56 ++
 docs/dev/stream/state/queryable_state.md      | 292 ++++++++
 docs/dev/stream/state/state.md                | 596 ++++++++++++++++
 docs/dev/stream/state/state_backends.md       |  46 ++
 docs/dev/table/index.md                       |   2 +-
 docs/internals/components.md                  |   4 +-
 docs/internals/stream_checkpointing.md        |   8 +-
 docs/internals/task_lifecycle.md              |   4 +-
 docs/monitoring/debugging_classloading.md     |   2 +-
 docs/monitoring/historyserver.md              |   2 +-
 docs/monitoring/large_state_tuning.md         | 237 -------
 docs/ops/README.md                            |  21 -
 docs/ops/cli.md                               | 355 ++++++++++
 docs/ops/config.md                            | 713 +++++++++++++++++++
 docs/ops/deployment/aws.md                    | 374 ++++++++++
 docs/ops/deployment/cluster_setup.md          | 151 ++++
 docs/ops/deployment/docker.md                 | 102 +++
 docs/ops/deployment/gce_setup.md              |  93 +++
 docs/ops/deployment/index.md                  |  24 +
 docs/ops/deployment/kubernetes.md             | 157 +++++
 docs/ops/deployment/mapr_setup.md             | 132 ++++
 docs/ops/deployment/mesos.md                  | 269 ++++++++
 docs/ops/deployment/yarn_setup.md             | 338 +++++++++
 docs/ops/index.md                             |  25 +
 docs/ops/jobmanager_high_availability.md      | 239 +++++++
 docs/ops/production_ready.md                  |   6 +-
 docs/ops/security-kerberos.md                 |   8 +-
 docs/ops/security-ssl.md                      | 144 ++++
 docs/ops/state/checkpoints.md                 | 101 +++
 docs/ops/state/index.md                       |  24 +
 docs/ops/state/large_state_tuning.md          | 237 +++++++
 docs/ops/state/savepoints.md                  | 198 ++++++
 docs/ops/state/state_backends.md              | 169 +++++
 docs/ops/state_backends.md                    | 169 -----
 docs/ops/upgrading.md                         |  14 +-
 docs/quickstart/setup_quickstart.md           |   2 +-
 docs/redirects/cli.md                         |   2 +-
 docs/redirects/fault_tolerance.md             |   2 +-
 docs/redirects/savepoints.md                  |   2 +-
 docs/redirects/state.md                       |   2 +-
 docs/redirects/state_backends.md              |   2 +-
 docs/setup/aws.md                             | 374 ----------
 docs/setup/building.md                        | 149 ----
 docs/setup/checkpoints.md                     | 101 ---
 docs/setup/cli.md                             | 355 ----------
 docs/setup/cluster_setup.md                   | 151 ----
 docs/setup/config.md                          | 713 -------------------
 docs/setup/deployment.md                      |  24 -
 docs/setup/docker.md                          | 102 ---
 docs/setup/flink_on_windows.md                |  86 ---
 docs/setup/gce_setup.md                       |  93 ---
 docs/setup/index.md                           |  25 -
 docs/setup/jobmanager_high_availability.md    | 239 -------
 docs/setup/kubernetes.md                      | 157 -----
 docs/setup/mapr_setup.md                      | 132 ----
 docs/setup/mesos.md                           | 269 --------
 docs/setup/savepoints.md                      | 198 ------
 docs/setup/security-ssl.md                    | 144 ----
 docs/setup/yarn_setup.md                      | 338 ---------
 docs/start/building.md                        | 149 ++++
 docs/start/flink_on_windows.md                |  86 +++
 84 files changed, 5495 insertions(+), 5370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/check_links.sh
----------------------------------------------------------------------
diff --git a/docs/check_links.sh b/docs/check_links.sh
index c4307a5..5d9f762 100755
--- a/docs/check_links.sh
+++ b/docs/check_links.sh
@@ -31,6 +31,9 @@ fi
 # Fail the build if any broken links are found
 broken_links_str=$(grep -e 'Found [[:digit:]]\+ broken links' spider.log)
 if [ -n "$broken_links_str" ]; then
-    echo -e "\e[1;31m$broken_links_str\e[0m"
+    grep -B 1 "Remote file does not exist -- broken link!!!" spider.log
+    echo 
"---------------------------------------------------------------------------"
+    echo -e "$broken_links_str"
+    echo "Search for page containing broken link using 'grep -R BROKEN_PATH 
DOCS_DIR'"
     exit 1
 fi

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/concepts/programming-model.md
----------------------------------------------------------------------
diff --git a/docs/concepts/programming-model.md 
b/docs/concepts/programming-model.md
index 7b0cfb5..fd5ebee 100644
--- a/docs/concepts/programming-model.md
+++ b/docs/concepts/programming-model.md
@@ -171,7 +171,7 @@ This alignment also allows Flink to redistribute the state 
and adjust the stream
 
 <img src="../fig/state_partitioning.svg" alt="State and Partitioning" 
class="offset" width="50%" />
 
-For more information, see the documentation on [working with 
state](../dev/stream/state.html).
+For more information, see the documentation on 
[state](../dev/stream/state/index.html).
 
 {% top %}
 
@@ -188,7 +188,7 @@ of events that need to be replayed).
 
 The description of the [fault tolerance internals]({{ site.baseurl 
}}/internals/stream_checkpointing.html) provides
 more information about how Flink manages checkpoints and related topics.
-Details about enabling and configuring checkpointing are in the [checkpointing 
API docs](../dev/stream/checkpointing.html).
+Details about enabling and configuring checkpointing are in the [checkpointing 
API docs](../dev/stream/state/checkpointing.html).
 
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/concepts/runtime.md
----------------------------------------------------------------------
diff --git a/docs/concepts/runtime.md b/docs/concepts/runtime.md
index c598b12..cb6d58f 100644
--- a/docs/concepts/runtime.md
+++ b/docs/concepts/runtime.md
@@ -54,8 +54,8 @@ The Flink runtime consists of two types of processes:
 
     There must always be at least one TaskManager.
 
-The JobManagers and TaskManagers can be started in various ways: directly on 
the machines as a [standalone cluster](../setup/cluster_setup.html), in
-containers, or managed by resource frameworks like 
[YARN](../setup/yarn_setup.html) or [Mesos](../setup/mesos.html).
+The JobManagers and TaskManagers can be started in various ways: directly on 
the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in
+containers, or managed by resource frameworks like 
[YARN](../ops/deployment/yarn_setup.html) or 
[Mesos](../ops/deployment/mesos.html).
 TaskManagers connect to JobManagers, announcing themselves as available, and 
are assigned work.
 
 The **client** is not part of the runtime and program execution, but is used 
to prepare and send a dataflow to the JobManager.
@@ -107,7 +107,7 @@ With hyper-threading, each slot then takes 2 or more 
hardware thread contexts.
 
 ## State Backends
 
-The exact data structures in which the key/values indexes are stored depends 
on the chosen [state backend](../ops/state_backends.html). One state backend
+The exact data structures in which the key/values indexes are stored depends 
on the chosen [state backend](../ops/state/state_backends.html). One state 
backend
 stores data in an in-memory hash map, another state backend uses 
[RocksDB](http://rocksdb.org) as the key/value store.
 In addition to defining the data structure that holds the state, the state 
backends also implement the logic to
 take a point-in-time snapshot of the key/value state and store that snapshot 
as part of a checkpoint.
@@ -120,8 +120,8 @@ take a point-in-time snapshot of the key/value state and 
store that snapshot as
 
 Programs written in the Data Stream API can resume execution from a 
**savepoint**. Savepoints allow both updating your programs and your Flink 
cluster without losing any state. 
 
-[Savepoints](..//setup/savepoints.html) are **manually triggered 
checkpoints**, which take a snapshot of the program and write it out to a state 
backend. They rely on the regular checkpointing mechanism for this. During 
execution programs are periodically snapshotted on the worker nodes and produce 
checkpoints. For recovery only the last completed checkpoint is needed and 
older checkpoints can be safely discarded as soon as a new one is completed.
+[Savepoints](../ops/state/savepoints.html) are **manually triggered 
checkpoints**, which take a snapshot of the program and write it out to a state 
backend. They rely on the regular checkpointing mechanism for this. During 
execution programs are periodically snapshotted on the worker nodes and produce 
checkpoints. For recovery only the last completed checkpoint is needed and 
older checkpoints can be safely discarded as soon as a new one is completed.
 
-Savepoints are similar to these periodic checkpoints except that they are 
**triggered by the user** and **don't automatically expire** when newer 
checkpoints are completed. Savepoints can be created from the [command 
line](../setup/cli.html#savepoints) or when cancelling a job via the [REST 
API](../monitoring/rest_api.html#cancel-job-with-savepoint).
+Savepoints are similar to these periodic checkpoints except that they are 
**triggered by the user** and **don't automatically expire** when newer 
checkpoints are completed. Savepoints can be created from the [command 
line](../ops/cli.html#savepoints) or when cancelling a job via the [REST 
API](../monitoring/rest_api.html#cancel-job-with-savepoint).
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/api_concepts.md
----------------------------------------------------------------------
diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md
index b3618aa..c447f27 100644
--- a/docs/dev/api_concepts.md
+++ b/docs/dev/api_concepts.md
@@ -100,7 +100,7 @@ will do the right thing depending on the context: if you 
are executing
 your program inside an IDE or as a regular Java program it will create
 a local environment that will execute your program on your local machine. If
 you created a JAR file from your program, and invoke it through the
-[command line]({{ site.baseurl }}/setup/cli.html), the Flink cluster manager
+[command line]({{ site.baseurl }}/ops/cli.html), the Flink cluster manager
 will execute your main method and `getExecutionEnvironment()` will return
 an execution environment for executing your program on a cluster.
 
@@ -169,7 +169,7 @@ will do the right thing depending on the context: if you 
are executing
 your program inside an IDE or as a regular Java program it will create
 a local environment that will execute your program on your local machine. If
 you created a JAR file from your program, and invoke it through the
-[command line]({{ site.baseurl }}/apis/cli.html), the Flink cluster manager
+[command line]({{ site.baseurl }}/ops/cli.html), the Flink cluster manager
 will execute your main method and `getExecutionEnvironment()` will return
 an execution environment for executing your program on a cluster.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/batch/connectors.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/connectors.md b/docs/dev/batch/connectors.md
index b7a0718..93bbf72 100644
--- a/docs/dev/batch/connectors.md
+++ b/docs/dev/batch/connectors.md
@@ -58,7 +58,7 @@ In order to use a Hadoop file system with Flink, make sure 
that
 
 #### Amazon S3
 
-See [Deployment & Operations - Deployment - AWS - S3: Simple Storage 
Service]({{ site.baseurl }}/setup/aws.html) for available S3 file system 
implementations, their configuration and required libraries.
+See [Deployment & Operations - Deployment - AWS - S3: Simple Storage 
Service]({{ site.baseurl }}/ops/deployment/aws.html) for available S3 file 
system implementations, their configuration and required libraries.
 
 #### Alluxio
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/python.md b/docs/dev/batch/python.md
index c4c2671..0383f54 100644
--- a/docs/dev/batch/python.md
+++ b/docs/dev/batch/python.md
@@ -615,7 +615,7 @@ env.execute()
 
 A system-wide default parallelism for all execution environments can be 
defined by setting the
 `parallelism.default` property in `./conf/flink-conf.yaml`. See the
-[Configuration]({{ site.baseurl }}/setup/config.html) documentation for 
details.
+[Configuration]({{ site.baseurl }}/ops/config.html) documentation for details.
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/cluster_execution.md
----------------------------------------------------------------------
diff --git a/docs/dev/cluster_execution.md b/docs/dev/cluster_execution.md
index d614846..03af637 100644
--- a/docs/dev/cluster_execution.md
+++ b/docs/dev/cluster_execution.md
@@ -33,7 +33,7 @@ are two ways to send a program to a cluster for execution:
 The command line interface lets you submit packaged programs (JARs) to a 
cluster
 (or single machine setup).
 
-Please refer to the [Command Line Interface]({{ site.baseurl 
}}/setup/cli.html) documentation for
+Please refer to the [Command Line Interface]({{ site.baseurl }}/ops/cli.html) 
documentation for
 details.
 
 ## Remote Environment

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md
index 177e601..00c0853 100644
--- a/docs/dev/connectors/index.md
+++ b/docs/dev/connectors/index.md
@@ -80,5 +80,5 @@ When a Flink application pushes a lot of data to an external 
data store, this
 can become an I/O bottleneck.
 If the data involved has many fewer reads than writes, a better approach can be
 for an external application to pull from Flink the data it needs.
-The [Queryable State]({{ site.baseurl }}/dev/stream/queryable_state.html) 
interface
+The [Queryable State]({{ site.baseurl 
}}/dev/stream/state/queryable_state.html) interface
 enables this by allowing the state being managed by Flink to be queried on 
demand.

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index d4e8978..042ad11 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -573,5 +573,5 @@ Once Kerberos-based Flink security is enabled, you can 
authenticate to Kafka wit
 When using standalone Flink deployment, you can also use `SASL_SSL`; please 
see how to configure the Kafka client for SSL 
[here](https://kafka.apache.org/documentation/#security_configclients). 
 - Set `sasl.kerberos.service.name` to `kafka` (default `kafka`): The value for 
this should match the `sasl.kerberos.service.name` used for Kafka broker 
configurations. A mismatch in service name between client and server 
configuration will cause the authentication to fail.
 
-For more information on Flink configuration for Kerberos security, please see 
[here]({{ site.baseurl}}/setup/config.html).
+For more information on Flink configuration for Kerberos security, please see 
[here]({{ site.baseurl}}/ops/config.html).
 You can also find [here]({{ site.baseurl}}/ops/security-kerberos.html) further 
details on how Flink internally setups Kerberos-based security.

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 7191d82..8b3899b 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -1158,7 +1158,7 @@ previous transformation. For example, you can use 
`someStream.map(...).startNewC
 you cannot use `someStream.startNewChain()`.
 
 A resource group is a slot in Flink, see
-[slots]({{site.baseurl}}/setup/config.html#configuring-taskmanager-processing-slots).
 You can
+[slots]({{site.baseurl}}/ops/config.html#configuring-taskmanager-processing-slots).
 You can
 manually isolate operators in separate slots if desired.
 
 <div class="codetabs" markdown="1">
@@ -1604,7 +1604,7 @@ for an explanation of most parameters. These parameters 
pertain specifically to
 
 ### Fault Tolerance
 
-[State & Checkpointing]({{ site.baseurl }}/dev/stream/checkpointing.html) 
describes how to enable and configure Flink's checkpointing mechanism.
+[State & Checkpointing]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) describes how to enable and configure 
Flink's checkpointing mechanism.
 
 ### Controlling Latency
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/execution_configuration.md
----------------------------------------------------------------------
diff --git a/docs/dev/execution_configuration.md 
b/docs/dev/execution_configuration.md
index 94e788c..2316450 100644
--- a/docs/dev/execution_configuration.md
+++ b/docs/dev/execution_configuration.md
@@ -23,7 +23,7 @@ under the License.
 -->
 
 The `StreamExecutionEnvironment` contains the `ExecutionConfig` which allows 
to set job specific configuration values for the runtime.
-To change the defaults that affect all jobs, see [Configuration]({{ 
site.baseurl }}/setup/config.html).
+To change the defaults that affect all jobs, see [Configuration]({{ 
site.baseurl }}/ops/config.html).
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/libs/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/storm_compatibility.md 
b/docs/dev/libs/storm_compatibility.md
index 89d7706..6b24dc0 100644
--- a/docs/dev/libs/storm_compatibility.md
+++ b/docs/dev/libs/storm_compatibility.md
@@ -134,7 +134,7 @@ DataStream<String> rawInput = env.addSource(
 
 If a Spout emits a finite number of tuples, `SpoutWrapper` can be configures 
to terminate automatically by setting `numberOfInvocations` parameter in its 
constructor.
 This allows the Flink program to shut down automatically after all data is 
processed.
-Per default the program will run until it is 
[canceled]({{site.baseurl}}/setup/cli.html) manually.
+Per default the program will run until it is 
[canceled]({{site.baseurl}}/ops/cli.html) manually.
 
 
 ## Embed Bolts

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/linking_with_flink.md
----------------------------------------------------------------------
diff --git a/docs/dev/linking_with_flink.md b/docs/dev/linking_with_flink.md
index 73ca677..3f55b9e 100644
--- a/docs/dev/linking_with_flink.md
+++ b/docs/dev/linking_with_flink.md
@@ -126,7 +126,7 @@ to run your program on Flink with Scala 2.11, you need to 
add a `_2.11` suffix t
 values of the Flink modules in your dependencies section.
 
 If you are looking for building Flink with Scala 2.11, please check
-[build guide]({{ site.baseurl }}/setup/building.html#scala-versions).
+[build guide]({{ site.baseurl }}/start/building.html#scala-versions).
 
 #### Hadoop Dependency Versions
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/local_execution.md
----------------------------------------------------------------------
diff --git a/docs/dev/local_execution.md b/docs/dev/local_execution.md
index 45a39e3..cf89956 100644
--- a/docs/dev/local_execution.md
+++ b/docs/dev/local_execution.md
@@ -57,7 +57,7 @@ The `LocalEnvironment` is a handle to local execution for 
Flink programs. Use it
 
 The local environment is instantiated via the method 
`ExecutionEnvironment.createLocalEnvironment()`. By default, it will use as 
many local threads for execution as your machine has CPU cores (hardware 
contexts). You can alternatively specify the desired parallelism. The local 
environment can be configured to log to the console using 
`enableLogging()`/`disableLogging()`.
 
-In most cases, calling `ExecutionEnvironment.getExecutionEnvironment()` is the 
even better way to go. That method returns a `LocalEnvironment` when the 
program is started locally (outside the command line interface), and it returns 
a pre-configured environment for cluster execution, when the program is invoked 
by the [command line interface]({{ site.baseurl }}/setup/cli.html).
+In most cases, calling `ExecutionEnvironment.getExecutionEnvironment()` is the 
even better way to go. That method returns a `LocalEnvironment` when the 
program is started locally (outside the command line interface), and it returns 
a pre-configured environment for cluster execution, when the program is invoked 
by the [command line interface]({{ site.baseurl }}/ops/cli.html).
 
 ~~~java
 public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/migration.md
----------------------------------------------------------------------
diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index cf8c8f8..3369a2c 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -37,7 +37,7 @@ This would be relevant mostly for users implementing custom 
`TypeSerializer`s fo
 
 Since Flink 1.3, two additional methods have been added that are related to 
serializer compatibility
 across savepoint restores. Please see
-[Handling serializer upgrades and compatibility]({{ site.baseurl 
}}/dev/stream/state.html#handling-serializer-upgrades-and-compatibility)
+[Handling serializer upgrades and compatibility]({{ site.baseurl 
}}/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility)
 for further details on how to implement these methods.
 
 ### `ProcessFunction` is always a `RichFunction`
@@ -75,7 +75,7 @@ For other custom projects, make sure to add logger 
dependencies. For example, in
 
 ## Migrating from Flink 1.1 to Flink 1.2
 
-As mentioned in the [State documentation]({{ site.baseurl 
}}/dev/stream/state.html), Flink has two types of state:
+As mentioned in the [State documentation]({{ site.baseurl 
}}/dev/stream/state/state.html), Flink has two types of state:
 **keyed** and **non-keyed** state (also called **operator** state). Both types 
are available to
 both operators and user-defined functions. This document will guide you 
through the process of migrating your Flink 1.1
 function code to Flink 1.2 and will present some important internal changes 
introduced in Flink 1.2 that concern the
@@ -89,7 +89,7 @@ The migration process will serve two goals:
 Flink 1.1 predecessor.
 
 After following the steps in this guide, you will be able to migrate your 
running job from Flink 1.1 to Flink 1.2
-simply by taking a [savepoint]({{ site.baseurl }}/setup/savepoints.html) with 
your Flink 1.1 job and giving it to
+simply by taking a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) 
with your Flink 1.1 job and giving it to
 your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to 
resume execution from where its
 Flink 1.1 predecessor left off.
 
@@ -203,7 +203,7 @@ contains elements `(test1, 2)` and `(test2, 2)`, when 
increasing the parallelism
 while `(test2, 2)` will go to task 1.
 
 More details on the principles behind rescaling of both keyed state and 
non-keyed state can be found in
-the [State documentation]({{ site.baseurl }}/dev/stream/state.html).
+the [State documentation]({{ site.baseurl }}/dev/stream/state/index.html).
 
 ##### ListCheckpointed
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/packaging.md
----------------------------------------------------------------------
diff --git a/docs/dev/packaging.md b/docs/dev/packaging.md
index ee351ae..e83d9ac 100644
--- a/docs/dev/packaging.md
+++ b/docs/dev/packaging.md
@@ -27,7 +27,7 @@ under the License.
 As described earlier, Flink programs can be executed on
 clusters by using a `remote environment`. Alternatively, programs can be 
packaged into JAR Files
 (Java Archives) for execution. Packaging the program is a prerequisite to 
executing them through the
-[command line interface]({{ site.baseurl }}/setup/cli.html).
+[command line interface]({{ site.baseurl }}/ops/cli.html).
 
 ### Packaging Programs
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/parallel.md
----------------------------------------------------------------------
diff --git a/docs/dev/parallel.md b/docs/dev/parallel.md
index 549481f..ae6f863 100644
--- a/docs/dev/parallel.md
+++ b/docs/dev/parallel.md
@@ -27,7 +27,7 @@ program consists of multiple tasks 
(transformations/operators, data sources, and
 several parallel instances for execution and each parallel instance processes 
a subset of the task's
 input data. The number of parallel instances of a task is called its 
*parallelism*.
 
-If you want to use [savepoints]({{ site.baseurl }}/setup/savepoints.html) you 
should also consider
+If you want to use [savepoints]({{ site.baseurl }}/ops/state/savepoints.html) 
you should also consider
 setting a maximum parallelism (or `max parallelism`). When restoring from a 
savepoint you can
 change the parallelism of specific operators or the whole program and this 
setting specifies
 an upper bound on the parallelism. This is required because Flink internally 
partitions state
@@ -181,7 +181,7 @@ try {
 
 A system-wide default parallelism for all execution environments can be 
defined by setting the
 `parallelism.default` property in `./conf/flink-conf.yaml`. See the
-[Configuration]({{ site.baseurl }}/setup/config.html) documentation for 
details.
+[Configuration]({{ site.baseurl }}/ops/config.html) documentation for details.
 
 ## Setting the Maximum Parallelism
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/checkpointing.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/checkpointing.md b/docs/dev/stream/checkpointing.md
deleted file mode 100644
index 4fe06c1..0000000
--- a/docs/dev/stream/checkpointing.md
+++ /dev/null
@@ -1,174 +0,0 @@
----
-title: "Checkpointing"
-nav-parent_id: streaming
-nav-pos: 50
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* ToC
-{:toc}
-
-Every function and operator in Flink can be **stateful** (see [working with 
state](state.html) for details).
-Stateful functions store data across the processing of individual 
elements/events, making state a critical building block for
-any type of more elaborate operation.
-
-In order to make state fault tolerant, Flink needs to **checkpoint** the 
state. Checkpoints allow Flink to recover state and positions
-in the streams to give the application the same semantics as a failure-free 
execution.
-
-The [documentation on streaming fault 
tolerance](../../internals/stream_checkpointing.html) describes in detail the 
technique behind Flink's streaming fault tolerance mechanism.
-
-
-## Prerequisites
-
-Flink's checkpointing mechanism interacts with durable storage for streams and 
state. In general, it requires:
-
-  - A *persistent* (or *durable*) data source that can replay records for a 
certain amount of time. Examples for such sources are persistent messages 
queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file 
systems (e.g., HDFS, S3, GFS, NFS, Ceph, ...).
-  - A persistent storage for state, typically a distributed filesystem (e.g., 
HDFS, S3, GFS, NFS, Ceph, ...)
-
-
-## Enabling and Configuring Checkpointing
-
-By default, checkpointing is disabled. To enable checkpointing, call 
`enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the 
checkpoint interval in milliseconds.
-
-Other parameters for checkpointing include:
-
-  - *exactly-once vs. at-least-once*: You can optionally pass a mode to the 
`enableCheckpointing(n)` method to choose between the two guarantee levels.
-    Exactly-once is preferrable for most applications. At-least-once may be 
relevant for certain super-low-latency (consistently few milliseconds) 
applications.
-
-  - *checkpoint timeout*: The time after which a checkpoint-in-progress is 
aborted, if it did not complete by then.
-
-  - *minimum time between checkpoints*: To make sure that the streaming 
application makes a certain amount of progress between checkpoints,
-    one can define how much time needs to pass between checkpoints. If this 
value is set for example to *5000*, the next checkpoint will be
-    started no sooner than 5 seconds after the previous checkpoint completed, 
regardless of the checkpoint duration and the checkpoint interval.
-    Note that this implies that the checkpoint interval will never be smaller 
than this parameter.
-    
-    It is often easier to configure applications by defining the "time between 
checkpoints" then the checkpoint interval, because the "time between 
checkpoints"
-    is not susceptible to the fact that checkpoints may sometimes take longer 
than on average (for example if the target storage system is temporarily slow).
-
-    Note that this value also implies that the number of concurrent 
checkpoints is *one*.
-
-  - *number of concurrent checkpoints*: By default, the system will not 
trigger another checkpoint while one is still in progress.
-    This ensures that the topology does not spend too much time on checkpoints 
and not make progress with processing the streams.
-    It is possible to allow for multiple overlapping checkpoints, which is 
interesting for pipelines that have a certain processing delay
-    (for example because the functions call external services that need some 
time to respond) but that still want to do very frequent checkpoints
-    (100s of milliseconds) to re-process very little upon failures.
-
-    This option cannot be used when a minimum time between checkpoints is 
defined.
-
-  - *externalized checkpoints*: You can configure periodic checkpoints to be 
persisted externally. Externalized checkpoints write their meta data out to 
persistent storage and are *not* automatically cleaned up when the job fails. 
This way, you will have a checkpoint around to resume from if your job fails. 
There are more details in the [deployment notes on externalized 
checkpoints](../../setup/checkpoints.html#externalized-checkpoints).
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-// start a checkpoint every 1000 ms
-env.enableCheckpointing(1000);
-
-// advanced options:
-
-// set mode to exactly-once (this is the default)
-env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-
-// make sure 500 ms of progress happen between checkpoints
-env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
-
-// checkpoints have to complete within one minute, or are discarded
-env.getCheckpointConfig().setCheckpointTimeout(60000);
-
-// allow only one checkpoint to be in progress at the same time
-env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-
-// enable externalized checkpoints which are retained after job cancellation
-env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-
-// start a checkpoint every 1000 ms
-env.enableCheckpointing(1000)
-
-// advanced options:
-
-// set mode to exactly-once (this is the default)
-env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
-
-// make sure 500 ms of progress happen between checkpoints
-env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
-
-// checkpoints have to complete within one minute, or are discarded
-env.getCheckpointConfig.setCheckpointTimeout(60000)
-
-// allow only one checkpoint to be in progress at the same time
-env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
-{% endhighlight %}
-</div>
-</div>
-
-### Related Config Options
-
-Some more parameters and/or defaults may be set via `conf/flink-conf.yaml` 
(see [configuration](config.html) for a full guide):
-
-- `state.backend`: The backend that will be used to store operator state 
checkpoints if checkpointing is enabled. Supported backends:
-   -  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's 
memory. Should be used only for minimal state (Kafka offsets) or testing and 
local debugging.
-   -  `filesystem`: State is in-memory on the TaskManagers, and state 
snapshots are stored in a file system. Supported are all filesystems supported 
by Flink, for example HDFS, S3, ...
-
-- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a 
Flink supported filesystem. Note: State backend must be accessible from the 
JobManager, use `file://` only for local setups.
-
-- `state.backend.rocksdb.checkpointdir`:  The local directory for storing 
RocksDB files, or a list of directories separated by the systems directory 
delimiter (for example ‘:’ (colon) on Linux/Unix). (DEFAULT value is 
`taskmanager.tmp.dirs`)
-
-- `state.checkpoints.dir`: The target directory for meta data of [externalized 
checkpoints](../../setup/checkpoints.html#externalized-checkpoints).
-
-- `state.checkpoints.num-retained`: The number of completed checkpoint 
instances to retain. Having more than one allows recovery fallback to an 
earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)
-
-{% top %}
-
-
-## Selecting a State Backend
-
-Flink's [checkpointing mechanism]({{ site.baseurl 
}}/internals/stream_checkpointing.html) stores consistent snapshots
-of all the state in timers and stateful operators, including connectors, 
windows, and any [user-defined state](state.html).
-Where the checkpoints are stored (e.g., JobManager memory, file system, 
database) depends on the configured
-**State Backend**. 
-
-By default, state is kept in memory in the TaskManagers and checkpoints are 
stored in memory in the JobManager. For proper persistence of large state,
-Flink supports various approaches for storing and checkpointing state in other 
state backends. The choice of state backend can be configured via 
`StreamExecutionEnvironment.setStateBackend(…)`.
-
-See [state backends](../../ops/state_backends.html) for more details on the 
available state backends and options for job-wide and cluster-wide 
configuration.
-
-
-## State Checkpoints in Iterative Jobs
-
-Flink currently only provides processing guarantees for jobs without 
iterations. Enabling checkpointing on an iterative job causes an exception. In 
order to force checkpointing on an iterative program the user needs to set a 
special flag when enabling checkpointing: `env.enableCheckpointing(interval, 
force = true)`.
-
-Please note that records in flight in the loop edges (and the state changes 
associated with them) will be lost during failure.
-
-{% top %}
-
-
-## Restart Strategies
-
-Flink supports different restart strategies which control how the jobs are 
restarted in case of a failure. For more 
-information, see [Restart Strategies]({{ site.baseurl 
}}/dev/restart_strategies.html).
-
-{% top %}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/process_function.md 
b/docs/dev/stream/process_function.md
index fb5f39d..696a8b8 100644
--- a/docs/dev/stream/process_function.md
+++ b/docs/dev/stream/process_function.md
@@ -38,7 +38,7 @@ all (acyclic) streaming applications:
 The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to 
keyed state and timers. It handles events
 by being invoked for each event received in the input stream(s).
 
-For fault-tolerant state, the `ProcessFunction` gives access to Flink's [keyed 
state](state.html), accessible via the
+For fault-tolerant state, the `ProcessFunction` gives access to Flink's [keyed 
state](state/state.html), accessible via the
 `RuntimeContext`, similar to the way other stateful functions can access keyed 
state.
 
 The timers allow applications to react to changes in processing time and in 
[event time](../event_time.html).

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/queryable_state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/queryable_state.md 
b/docs/dev/stream/queryable_state.md
deleted file mode 100644
index 234be51..0000000
--- a/docs/dev/stream/queryable_state.md
+++ /dev/null
@@ -1,291 +0,0 @@
----
-title: "Queryable State"
-nav-parent_id: streaming
-nav-pos: 61
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* ToC
-{:toc}
-
-<div class="alert alert-warning">
-  <strong>Note:</strong> The client APIs for queryable state are currently in 
an evolving state and
-  there are <strong>no guarantees</strong> made about stability of the 
provided interfaces. It is
-  likely that there will be breaking API changes on the client side in the 
upcoming Flink versions.
-</div>
-
-In a nutshell, this feature allows users to query Flink's managed partitioned 
state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state.html)) from 
outside of
-Flink. For some scenarios, queryable state thus eliminates the need for 
distributed
-operations/transactions with external systems such as key-value stores which 
are often the
-bottleneck in practice.
-
-<div class="alert alert-warning">
-  <strong>Attention:</strong> Queryable state accesses keyed state from a 
concurrent thread rather
-  than synchronizing with the operator and potentially blocking its operation. 
Since any state
-  backend using Java heap space, e.g. MemoryStateBackend or
-  FsStateBackend, does not work with copies when retrieving values but instead 
directly
-  references the stored values, read-modify-write patterns are unsafe and may 
cause the
-  queryable state server to fail due to concurrent modifications.
-  The RocksDBStateBackend is safe from these issues.
-</div>
-
-## Making State Queryable
-
-In order to make state queryable, the queryable state server first needs to be 
enabled globally
-by setting the `query.server.enable` configuration parameter to `true` 
(current default).
-Then appropriate state needs to be made queryable by using either
-
-* a `QueryableStateStream`, a convenience object which behaves like a sink and 
offers incoming values as
-queryable state, or
-* `StateDescriptor#setQueryable(String queryableStateName)`, which makes the 
keyed state of an
-operator queryable.
-
-The following sections explain the use of these two approaches.
-
-### Queryable State Stream
-
-A `KeyedStream` may offer its values as queryable state by using the following 
methods:
-
-{% highlight java %}
-// ValueState
-QueryableStateStream asQueryableState(
-    String queryableStateName,
-    ValueStateDescriptor stateDescriptor)
-
-// Shortcut for explicit ValueStateDescriptor variant
-QueryableStateStream asQueryableState(String queryableStateName)
-
-// FoldingState
-QueryableStateStream asQueryableState(
-    String queryableStateName,
-    FoldingStateDescriptor stateDescriptor)
-
-// ReducingState
-QueryableStateStream asQueryableState(
-    String queryableStateName,
-    ReducingStateDescriptor stateDescriptor)
-{% endhighlight %}
-
-
-<div class="alert alert-info">
-  <strong>Note:</strong> There is no queryable <code>ListState</code> sink as 
it would result in an ever-growing
-  list which may not be cleaned up and thus will eventually consume too much 
memory.
-</div>
-
-A call to these methods returns a `QueryableStateStream`, which cannot be 
further transformed and
-currently only holds the name as well as the value and key serializer for the 
queryable state
-stream. It is comparable to a sink, and cannot be followed by further 
transformations.
-
-Internally a `QueryableStateStream` gets translated to an operator which uses 
all incoming
-records to update the queryable state instance.
-In a program like the following, all records of the keyed stream will be used 
to update the state
-instance, either via `ValueState#update(value)` or 
`AppendingState#add(value)`, depending on
-the chosen state variant:
-{% highlight java %}
-stream.keyBy(0).asQueryableState("query-name")
-{% endhighlight %}
-This acts like the Scala API's `flatMapWithState`.
-
-### Managed Keyed State
-
-Managed keyed state of an operator
-(see [Using Managed Keyed State]({{ site.baseurl 
}}/dev/stream/state.html#using-managed-keyed-state))
-can be made queryable by making the appropriate state descriptor queryable via
-`StateDescriptor#setQueryable(String queryableStateName)`, as in the example 
below:
-{% highlight java %}
-ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
-        new ValueStateDescriptor<>(
-                "average", // the state name
-                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 
type information
-                Tuple2.of(0L, 0L)); // default value of the state, if nothing 
was set
-descriptor.setQueryable("query-name"); // queryable state name
-{% endhighlight %}
-<div class="alert alert-info">
-  <strong>Note:</strong> The `queryableStateName` parameter may be chosen 
arbitrarily and is only
-  used for queries. It does not have to be identical to the state's own name.
-</div>
-
-
-## Querying State
-
-The `QueryableStateClient` helper class may be used for queries against the 
`KvState` instances that
-serve the state internally. It needs to be set up with a valid `JobManager` 
address and port and is
-created as follows:
-
-{% highlight java %}
-final Configuration config = new Configuration();
-config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
-config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
-
-final HighAvailabilityServices highAvailabilityServices =
-      HighAvailabilityServicesUtils.createHighAvailabilityServices(
-           config,
-           Executors.newSingleThreadScheduledExecutor(),
-           
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
-QueryableStateClient client = new QueryableStateClient(config, 
highAvailabilityServices);
-{% endhighlight %}
-
-The query method is this:
-
-{% highlight java %}
-Future<byte[]> getKvState(
-    JobID jobID,
-    String queryableStateName,
-    int keyHashCode,
-    byte[] serializedKeyAndNamespace)
-{% endhighlight %}
-
-A call to this method returns a `Future` eventually holding the serialized 
state value for the
-queryable state instance identified by `queryableStateName` of the job with ID 
`jobID`. The
-`keyHashCode` is the hash code of the key as returned by `Object.hashCode()` 
and the
-`serializedKeyAndNamespace` is the serialized key and namespace.
-<div class="alert alert-info">
-  <strong>Note:</strong> The client is asynchronous and can be shared by 
multiple threads. It needs
-  to be shutdown via <code>QueryableStateClient#shutdown()</code> when unused 
in order to free
-  resources.
-</div>
-
-The current implementation is still pretty low-level in the sense that it only 
works with
-serialized data both for providing the key/namespace and the returned results. 
It is the
-responsibility of the user (or some follow-up utilities) to set up the 
serializers for this. The
-nice thing about this is that the query services don't have to get into the 
business of worrying
-about any class loading issues etc.
-
-There are some serialization utils for key/namespace and value serialization 
included in
-`KvStateRequestSerializer`.
-
-### Example
-
-The following example extends the `CountWindowAverage` example
-(see [Using Managed Keyed State]({{ site.baseurl 
}}/dev/stream/state.html#using-managed-keyed-state))
-by making it queryable and showing how to query this value:
-
-{% highlight java %}
-public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
-
-    private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;
-
-    @Override
-    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
-        Tuple2<Long, Long> currentSum = sum.value();
-        currentSum.f0 += 1;
-        currentSum.f1 += input.f1;
-        sum.update(currentSum);
-
-        if (currentSum.f0 >= 2) {
-            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
-            sum.clear();
-        }
-    }
-
-    @Override
-    public void open(Configuration config) {
-        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
-                new ValueStateDescriptor<>(
-                        "average", // the state name
-                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{}), // type information
-                        Tuple2.of(0L, 0L)); // default value of the state, if 
nothing was set
-        descriptor.setQueryable("query-name");
-        sum = getRuntimeContext().getState(descriptor);
-    }
-}
-{% endhighlight %}
-
-Once used in a job, you can retrieve the job ID and then query any key's 
current state from this operator:
-
-{% highlight java %}
-final Configuration config = new Configuration();
-config.setString(JobManagerOptions.ADDRESS, queryAddress);
-config.setInteger(JobManagerOptions.PORT, queryPort);
-
-final HighAvailabilityServices highAvailabilityServices =
-      HighAvailabilityServicesUtils.createHighAvailabilityServices(
-           config,
-           Executors.newSingleThreadScheduledExecutor(),
-           
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
-QueryableStateClient client = new QueryableStateClient(config, 
highAvailabilityServices);
-
-final TypeSerializer<Long> keySerializer =
-        TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new 
ExecutionConfig());
-final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
-        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{}).createSerializer(new ExecutionConfig());
-
-final byte[] serializedKey =
-        KvStateRequestSerializer.serializeKeyAndNamespace(
-                key, keySerializer,
-                VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
-
-Future<byte[]> serializedResult =
-        client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);
-
-// now wait for the result and return it
-final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
-byte[] serializedValue = Await.result(serializedResult, duration);
-Tuple2<Long, Long> value =
-        KvStateRequestSerializer.deserializeValue(serializedValue, 
valueSerializer);
-{% endhighlight %}
-
-### Note for Scala Users
-
-Please use the available Scala extensions when creating the `TypeSerializer` 
instances. Add the following import:
-
-```scala
-import org.apache.flink.streaming.api.scala._
-```
-
-Now you can create the type serializers as follows:
-
-```scala
-val keySerializer = createTypeInformation[Long]
-  .createSerializer(new ExecutionConfig)
-```
-
-If you don't do this, you can run into mismatches between the serializers used 
in the Flink job and in your client code, because types like `scala.Long` 
cannot be caputured at runtime.
-
-## Configuration
-
-The following configuration parameters influence the behaviour of the 
queryable state server and client.
-They are defined in `QueryableStateOptions`.
-
-### Server
-* `query.server.enable`: flag to indicate whether to start the queryable state 
server
-* `query.server.port`: port to bind to the internal `KvStateServer` (0 => pick 
random available port)
-* `query.server.network-threads`: number of network (event loop) threads for 
the `KvStateServer` (0 => #slots)
-* `query.server.query-threads`: number of asynchronous query threads for the 
`KvStateServerHandler` (0 => #slots).
-
-### Client (`QueryableStateClient`)
-* `query.client.network-threads`: number of network (event loop) threads for 
the `KvStateClient` (0 => number of available cores)
-* `query.client.lookup.num-retries`: number of retries on location lookup 
failures
-* `query.client.lookup.retry-delay`: retry delay on location lookup failures 
(millis)
-
-## Limitations
-
-* The queryable state life-cycle is bound to the life-cycle of the job, e.g. 
tasks register
-queryable state on startup and unregister it on disposal. In future versions, 
it is desirable to
-decouple this in order to allow queries after a task finishes, and to speed up 
recovery via state
-replication.
-* Notifications about available KvState happen via a simple tell. In the 
future this should be improved to be
-more robust with asks and acknowledgements.
-* The server and client keep track of statistics for queries. These are 
currently disabled by
-default as they would not be exposed anywhere. As soon as there is better 
support to publish these
-numbers via the Metrics system, we should enable the stats.

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
deleted file mode 100644
index dd61c74..0000000
--- a/docs/dev/stream/state.md
+++ /dev/null
@@ -1,768 +0,0 @@
----
-title: "Working with State"
-nav-parent_id: streaming
-nav-pos: 40
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* ToC
-{:toc}
-
-Stateful functions and operators store data across the processing of 
individual elements/events, making state a critical building block for
-any type of more elaborate operation. For example:
-
-  - When an application searches for certain event patterns, the state will 
store the sequence of events encountered so far.
-  - When aggregating events per minute, the state holds the pending aggregates.
-  - When training a machine learning model over a stream of data points, the 
state holds the current version of the model parameters.
-
-In order to make state fault tolerant, Flink needs to be aware of the state 
and [checkpoint](checkpointing.html) it.
-In many cases, Flink can also *manage* the state for the application, meaning 
Flink deals with the memory management (possibly spilling to disk
-if necessary) to allow applications to hold very large state.
-
-This document explains how to use Flink's state abstractions when developing 
an application.
-
-
-## Keyed State and Operator State
-
-There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
-
-### Keyed State
-
-*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
-
-You can think of Keyed State as Operator State that has been partitioned,
-or sharded, with exactly one state-partition per key.
-Each keyed-state is logically bound to a unique
-composite of <parallel-operator-instance, key>, and since each key
-"belongs" to exactly one parallel instance of a keyed operator, we can
-think of this simply as <operator, key>.
-
-Keyed State is further organized into so-called *Key Groups*. Key Groups are 
the
-atomic unit by which Flink can redistribute Keyed State;
-there are exactly as many Key Groups as the defined maximum parallelism.
-During execution each parallel instance of a keyed operator works with the keys
-for one or more Key Groups.
-
-### Operator State
-
-With *Operator State* (or *non-keyed state*), each operator state is
-bound to one parallel operator instance.
-The [Kafka Connector](../connectors/kafka.html) is a good motivating example 
for the use of Operator State
-in Flink. Each parallel instance of the Kafka consumer maintains a map
-of topic partitions and offsets as its Operator State.
-
-The Operator State interfaces support redistributing state among
-parallel operator instances when the parallelism is changed. There can be 
different schemes for doing this redistribution.
-
-## Raw and Managed State
-
-*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
-
-*Managed State* is represented in data structures controlled by the Flink 
runtime, such as internal hash tables, or RocksDB.
-Examples are "ValueState", "ListState", etc. Flink's runtime encodes
-the states and writes them into the checkpoints.
-
-*Raw State* is state that operators keep in their own data structures. When 
checkpointed, they only write a sequence of bytes into
-the checkpoint. Flink knows nothing about the state's data structures and sees 
only the raw bytes.
-
-All datastream functions can use managed state, but the raw state interfaces 
can only be used when implementing operators.
-Using managed state (rather than raw state) is recommended, since with
-managed state Flink is able to automatically redistribute state when the 
parallelism is
-changed, and also do better memory management.
-
-## Using Managed Keyed State
-
-The managed keyed state interface provides access to different types of state 
that are all scoped to
-the key of the current input element. This means that this type of state can 
only be used
-on a `KeyedStream`, which can be created via `stream.keyBy(…)`.
-
-Now, we will first look at the different types of state available and then we 
will see
-how they can be used in a program. The available state primitives are:
-
-* `ValueState<T>`: This keeps a value that can be updated and
-retrieved (scoped to key of the input element as mentioned above, so there 
will possibly be one value
-for each key that the operation sees). The value can be set using `update(T)` 
and retrieved using
-`T value()`.
-
-* `ListState<T>`: This keeps a list of elements. You can append elements and 
retrieve an `Iterable`
-over all currently stored elements. Elements are added using `add(T)`, the 
Iterable can
-be retrieved using `Iterable<T> get()`.
-
-* `ReducingState<T>`: This keeps a single value that represents the 
aggregation of all values
-added to the state. The interface is the same as for `ListState` but elements 
added using
-`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
-
-* `FoldingState<T, ACC>`: This keeps a single value that represents the 
aggregation of all values
-added to the state. Contrary to `ReducingState`, the aggregate type may be 
different from the type
-of elements that are added to the state. The interface is the same as for 
`ListState` but elements
-added using `add(T)` are folded into an aggregate using a specified 
`FoldFunction`.
-
-* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value 
pairs into the state and
-retrieve an `Iterable` over all currently stored mappings. Mappings are added 
using `put(UK, UV)` or
-`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved 
using `get(UK)`. The iterable
-views for mappings, keys and values can be retrieved using `entries()`, 
`keys()` and `values()` respectively.
-
-All types of state also have a method `clear()` that clears the state for the 
currently
-active key, i.e. the key of the input element.
-
-<span class="label label-danger">Attention</span> `FoldingState` will be 
deprecated in one of
-the next versions of Flink and will be completely removed in the future. A 
more general
-alternative will be provided.
-
-It is important to keep in mind that these state objects are only used for 
interfacing
-with state. The state is not necessarily stored inside but might reside on 
disk or somewhere else.
-The second thing to keep in mind is that the value you get from the state
-depends on the key of the input element. So the value you get in one 
invocation of your
-user function can differ from the value in another invocation if the keys 
involved are different.
-
-To get a state handle, you have to create a `StateDescriptor`. This holds the 
name of the state
-(as we will see later, you can create several states, and they have to have 
unique names so
-that you can reference them), the type of the values that the state holds, and 
possibly
-a user-specified function, such as a `ReduceFunction`. Depending on what type 
of state you
-want to retrieve, you create either a `ValueStateDescriptor`, a 
`ListStateDescriptor`,
-a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a 
`MapStateDescriptor`.
-
-State is accessed using the `RuntimeContext`, so it is only possible in *rich 
functions*.
-Please see [here]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) for
-information about that, but we will also see an example shortly. The 
`RuntimeContext` that
-is available in a `RichFunction` has these methods for accessing state:
-
-* `ValueState<T> getState(ValueStateDescriptor<T>)`
-* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
-* `ListState<T> getListState(ListStateDescriptor<T>)`
-* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
-* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
-
-This is an example `FlatMapFunction` that shows how all of the parts fit 
together:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
-
-    /**
-     * The ValueState handle. The first field is the count, the second field a 
running sum.
-     */
-    private transient ValueState<Tuple2<Long, Long>> sum;
-
-    @Override
-    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
-
-        // access the state value
-        Tuple2<Long, Long> currentSum = sum.value();
-
-        // update the count
-        currentSum.f0 += 1;
-
-        // add the second field of the input value
-        currentSum.f1 += input.f1;
-
-        // update the state
-        sum.update(currentSum);
-
-        // if the count reaches 2, emit the average and clear the state
-        if (currentSum.f0 >= 2) {
-            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
-            sum.clear();
-        }
-    }
-
-    @Override
-    public void open(Configuration config) {
-        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
-                new ValueStateDescriptor<>(
-                        "average", // the state name
-                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{}), // type information
-                        Tuple2.of(0L, 0L)); // default value of the state, if 
nothing was set
-        sum = getRuntimeContext().getState(descriptor);
-    }
-}
-
-// this can be used in a streaming program like this (assuming we have a 
StreamExecutionEnvironment env)
-env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), 
Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
-        .keyBy(0)
-        .flatMap(new CountWindowAverage())
-        .print();
-
-// the printed output will be (1,4) and (1,5)
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, 
Long)] {
-
-  private var sum: ValueState[(Long, Long)] = _
-
-  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): 
Unit = {
-
-    // access the state value
-    val tmpCurrentSum = sum.value
-
-    // If it hasn't been used before, it will be null
-    val currentSum = if (tmpCurrentSum != null) {
-      tmpCurrentSum
-    } else {
-      (0L, 0L)
-    }
-
-    // update the count
-    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
-
-    // update the state
-    sum.update(newSum)
-
-    // if the count reaches 2, emit the average and clear the state
-    if (newSum._1 >= 2) {
-      out.collect((input._1, newSum._2 / newSum._1))
-      sum.clear()
-    }
-  }
-
-  override def open(parameters: Configuration): Unit = {
-    sum = getRuntimeContext.getState(
-      new ValueStateDescriptor[(Long, Long)]("average", 
createTypeInformation[(Long, Long)])
-    )
-  }
-}
-
-
-object ExampleCountWindowAverage extends App {
-  val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-  env.fromCollection(List(
-    (1L, 3L),
-    (1L, 5L),
-    (1L, 7L),
-    (1L, 4L),
-    (1L, 2L)
-  )).keyBy(_._1)
-    .flatMap(new CountWindowAverage())
-    .print()
-  // the printed output will be (1,4) and (1,5)
-
-  env.execute("ExampleManagedState")
-}
-{% endhighlight %}
-</div>
-</div>
-
-This example implements a poor man's counting window. We key the tuples by the 
first field
-(in the example all have the same key `1`). The function stores the count and 
a running sum in
-a `ValueState`. Once the count reaches 2 it will emit the average and clear 
the state so that
-we start over from `0`. Note that this would keep a different state value for 
each different input
-key if we had tuples with different values in the first field.
-
-### State in the Scala DataStream API
-
-In addition to the interface described above, the Scala API has shortcuts for 
stateful
-`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. 
The user function
-gets the current value of the `ValueState` in an `Option` and must return an 
updated value that
-will be used to update the state.
-
-{% highlight scala %}
-val stream: DataStream[(String, Int)] = ...
-
-val counts: DataStream[(String, Int)] = stream
-  .keyBy(_._1)
-  .mapWithState((in: (String, Int), count: Option[Int]) =>
-    count match {
-      case Some(c) => ( (in._1, c), Some(c + in._2) )
-      case None => ( (in._1, 0), Some(in._2) )
-    })
-{% endhighlight %}
-
-## Using Managed Operator State
-
-To use managed operator state, a stateful function can implement either the 
more general `CheckpointedFunction`
-interface, or the `ListCheckpointed<T extends Serializable>` interface.
-
-#### CheckpointedFunction
-
-The `CheckpointedFunction` interface provides access to non-keyed state with 
different
-redistribution schemes. It requires the implementation of two methods:
-
-{% highlight java %}
-void snapshotState(FunctionSnapshotContext context) throws Exception;
-
-void initializeState(FunctionInitializationContext context) throws Exception;
-{% endhighlight %}
-
-Whenever a checkpoint has to be performed, `snapshotState()` is called. The 
counterpart, `initializeState()`,
-is called every time the user-defined function is initialized, be that when 
the function is first initialized
-or be that when the function is actually recovering from an earlier 
checkpoint. Given this, `initializeState()` is not
-only the place where different types of state are initialized, but also where 
state recovery logic is included.
-
-Currently, list-style managed operator state is supported. The state
-is expected to be a `List` of *serializable* objects, independent from each 
other,
-thus eligible for redistribution upon rescaling. In other words, these objects 
are the finest granularity at which
-non-keyed state can be redistributed. Depending on the state accessing method,
-the following redistribution schemes are defined:
-
-  - **Even-split redistribution:** Each operator returns a List of state 
elements. The whole state is logically a concatenation of
-    all lists. On restore/redistribution, the list is evenly divided into as 
many sublists as there are parallel operators.
-    Each operator gets a sublist, which can be empty, or contain one or more 
elements.
-    As an example, if with parallelism 1 the checkpointed state of an operator
-    contains elements `element1` and `element2`, when increasing the 
parallelism to 2, `element1` may end up in operator instance 0,
-    while `element2` will go to operator instance 1.
-
-  - **Union redistribution:** Each operator returns a List of state elements. 
The whole state is logically a concatenation of
-    all lists. On restore/redistribution, each operator gets the complete list 
of state elements.
-
-Below is an example of a stateful `SinkFunction` that uses 
`CheckpointedFunction`
-to buffer elements before sending them to the outside world. It demonstrates
-the basic even-split redistribution list state:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public class BufferingSink
-        implements SinkFunction<Tuple2<String, Integer>>,
-                   CheckpointedFunction,
-                   CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-        checkpointedState.clear();
-        for (Tuple2<String, Integer> element : bufferedElements) {
-            checkpointedState.add(element);
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws 
Exception {
-        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
-            new ListStateDescriptor<>(
-                "buffered-elements",
-                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
-
-        checkpointedState = 
context.getOperatorStateStore().getListState(descriptor);
-
-        if (context.isRestored()) {
-            for (Tuple2<String, Integer> element : checkpointedState.get()) {
-                bufferedElements.add(element);
-            }
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws 
Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class BufferingSink(threshold: Int = 0)
-  extends SinkFunction[(String, Int)]
-    with CheckpointedFunction
-    with CheckpointedRestoring[List[(String, Int)]] {
-
-  @transient
-  private var checkpointedState: ListState[(String, Int)] = null
-
-  private val bufferedElements = ListBuffer[(String, Int)]()
-
-  override def invoke(value: (String, Int)): Unit = {
-    bufferedElements += value
-    if (bufferedElements.size == threshold) {
-      for (element <- bufferedElements) {
-        // send it to the sink
-      }
-      bufferedElements.clear()
-    }
-  }
-
-  override def snapshotState(context: FunctionSnapshotContext): Unit = {
-    checkpointedState.clear()
-    for (element <- bufferedElements) {
-      checkpointedState.add(element)
-    }
-  }
-
-  override def initializeState(context: FunctionInitializationContext): Unit = 
{
-    val descriptor = new ListStateDescriptor[(String, Int)](
-      "buffered-elements",
-      TypeInformation.of(new TypeHint[(String, Int)]() {})
-    )
-
-    checkpointedState = context.getOperatorStateStore.getListState(descriptor)
-
-    if(context.isRestored) {
-      for(element <- checkpointedState.get()) {
-        bufferedElements += element
-      }
-    }
-  }
-
-  override def restoreState(state: List[(String, Int)]): Unit = {
-    bufferedElements ++= state
-  }
-}
-{% endhighlight %}
-</div>
-</div>
-
-The `initializeState` method takes as argument a 
`FunctionInitializationContext`. This is used to initialize
-the non-keyed state "containers". These are a container of type `ListState` 
where the non-keyed state objects
-are going to be stored upon checkpointing.
-
-Note how the state is initialized, similar to keyed state,
-with a `StateDescriptor` that contains the state name and information
-about the type of the value that the state holds:
-
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ListStateDescriptor<Tuple2<String, Integer>> descriptor =
-    new ListStateDescriptor<>(
-        "buffered-elements",
-        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
-
-checkpointedState = context.getOperatorStateStore().getListState(descriptor);
-{% endhighlight %}
-
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-val descriptor = new ListStateDescriptor[(String, Long)](
-    "buffered-elements",
-    TypeInformation.of(new TypeHint[(String, Long)]() {})
-)
-
-checkpointedState = context.getOperatorStateStore.getListState(descriptor)
-
-{% endhighlight %}
-</div>
-</div>
-The naming convention of the state access methods contain its redistribution
-pattern followed by its state structure. For example, to use list state with 
the
-union redistribution scheme on restore, access the state by using 
`getUnionListState(descriptor)`.
-If the method name does not contain the redistribution pattern, *e.g.* 
`getListState(descriptor)`,
-it simply implies that the basic even-split redistribution scheme will be used.
-
-After initializing the container, we use the `isRestored()` method of the 
context to check if we are
-recovering after a failure. If this is `true`, *i.e.* we are recovering, the 
restore logic is applied.
-
-As shown in the code of the modified `BufferingSink`, this `ListState` 
recovered during state
-initialization is kept in a class variable for future use in 
`snapshotState()`. There the `ListState` is cleared
-of all objects included by the previous checkpoint, and is then filled with 
the new ones we want to checkpoint.
-
-As a side note, the keyed state can also be initialized in the 
`initializeState()` method. This can be done
-using the provided `FunctionInitializationContext`.
-
-#### ListCheckpointed
-
-The `ListCheckpointed` interface is a more limited variant of 
`CheckpointedFunction`,
-which only supports list-style state with even-split redistribution scheme on 
restore.
-It also requires the implementation of two methods:
-
-{% highlight java %}
-List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
-
-void restoreState(List<T> state) throws Exception;
-{% endhighlight %}
-
-On `snapshotState()` the operator should return a list of objects to 
checkpoint and
-`restoreState` has to handle such a list upon recovery. If the state is not 
re-partitionable, you can always
-return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
-
-### Stateful Source Functions
-
-Stateful sources require a bit more care as opposed to other operators.
-In order to make the updates to the state and output collection atomic 
(required for exactly-once semantics
-on failure/recovery), the user is required to get a lock from the source's 
context.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public static class CounterSource
-        extends RichParallelSourceFunction<Long>
-        implements ListCheckpointed<Long> {
-
-    /**  current offset for exactly once semantics */
-    private Long offset;
-
-    /** flag for job cancellation */
-    private volatile boolean isRunning = true;
-
-    @Override
-    public void run(SourceContext<Long> ctx) {
-        final Object lock = ctx.getCheckpointLock();
-
-        while (isRunning) {
-            // output and state update are atomic
-            synchronized (lock) {
-                ctx.collect(offset);
-                offset += 1;
-            }
-        }
-    }
-
-    @Override
-    public void cancel() {
-        isRunning = false;
-    }
-
-    @Override
-    public List<Long> snapshotState(long checkpointId, long 
checkpointTimestamp) {
-        return Collections.singletonList(offset);
-    }
-
-    @Override
-    public void restoreState(List<Long> state) {
-        for (Long s : state)
-            offset = s;
-    }
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class CounterSource
-       extends RichParallelSourceFunction[Long]
-       with ListCheckpointed[Long] {
-
-  @volatile
-  private var isRunning = true
-
-  private var offset = 0L
-
-  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
-    val lock = ctx.getCheckpointLock
-
-    while (isRunning) {
-      // output and state update are atomic
-      lock.synchronized({
-        ctx.collect(offset)
-
-        offset += 1
-      })
-    }
-  }
-
-  override def cancel(): Unit = isRunning = false
-
-  override def restoreState(state: util.List[Long]): Unit =
-    for (s <- state) {
-      offset = s
-    }
-
-  override def snapshotState(checkpointId: Long, timestamp: Long): 
util.List[Long] =
-    Collections.singletonList(offset)
-
-}
-{% endhighlight %}
-</div>
-</div>
-
-Some operators might need the information when a checkpoint is fully 
acknowledged by Flink to communicate that with the outside world. In this case 
see the `org.apache.flink.runtime.state.CheckpointListener` interface.
-
-## Custom Serialization for Managed State
-
-This section is targeted as a guideline for users who require the use of 
custom serialization for their state, covering how
-to provide a custom serializer and how to handle upgrades to the serializer 
for compatibility. If you're simply using
-Flink's own serializers, this section is irrelevant and can be skipped.
-
-### Using custom serializers
-
-As demonstrated in the above examples, when registering a managed operator or 
keyed state, a `StateDescriptor` is required
-to specify the state's name, as well as information about the type of the 
state. The type information is used by Flink's
-[type serialization framework](../types_serialization.html) to create 
appropriate serializers for the state.
-
-It is also possible to completely bypass this and let Flink use your own 
custom serializer to serialize managed states,
-simply by directly instantiating the `StateDescriptor` with your own 
`TypeSerializer` implementation:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, 
Integer>> {...};
-
-ListStateDescriptor<Tuple2<String, Integer>> descriptor =
-    new ListStateDescriptor<>(
-        "state-name",
-        new CustomTypeSerializer());
-
-checkpointedState = getRuntimeContext().getListState(descriptor);
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}
-
-val descriptor = new ListStateDescriptor[(String, Integer)](
-    "state-name",
-    new CustomTypeSerializer)
-)
-
-checkpointedState = getRuntimeContext.getListState(descriptor);
-{% endhighlight %}
-</div>
-</div>
-
-Note that Flink writes state serializers along with the state as metadata. In 
certain cases on restore (see following
-subsections), the written serializer needs to be deserialized and used. 
Therefore, it is recommended to avoid using
-anonymous classes as your state serializers. Anonymous classes do not have a 
guarantee on the generated classname,
-varying across compilers and depends on the order that they are instantiated 
within the enclosing class, which can 
-easily cause the previously written serializer to be unreadable (since the 
original class can no longer be found in the
-classpath).
-
-### Handling serializer upgrades and compatibility
-
-Flink allows changing the serializers used to read and write managed state, so 
that users are not locked in to any
-specific serialization. When state is restored, the new serializer registered 
for the state (i.e., the serializer
-that comes with the `StateDescriptor` used to access the state in the restored 
job) will be checked for compatibility,
-and is replaced as the new serializer for the state.
-
-A compatible serializer would mean that the serializer is capable of reading 
previous serialized bytes of the state,
-and the written binary format of the state also remains identical. The means 
to check the new serializer's compatibility
-is provided through the following two methods of the `TypeSerializer` 
interface:
-
-{% highlight java %}
-public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
-public abstract CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
-{% endhighlight %}
-
-Briefly speaking, every time a checkpoint is performed, the 
`snapshotConfiguration` method is called to create a
-point-in-time view of the state serializer's configuration. The returned 
configuration snapshot is stored along with the
-checkpoint as the state's metadata. When the checkpoint is used to restore a 
job, that serializer configuration snapshot
-will be provided to the _new_ serializer of the same state via the counterpart 
method, `ensureCompatibility`, to verify
-compatibility of the new serializer. This method serves as a check for whether 
or not the new serializer is compatible,
-as well as a hook to possibly reconfigure the new serializer in the case that 
it is incompatible.
-
-Note that Flink's own serializers are implemented such that they are at least 
compatible with themselves, i.e. when the
-same serializer is used for the state in the restored job, the serializer's 
will reconfigure themselves to be compatible
-with their previous configuration.
-
-The following subsections illustrate guidelines to implement these two methods 
when using custom serializers.
-
-#### Implementing the `snapshotConfiguration` method
-
-The serializer's configuration snapshot should capture enough information such 
that on restore, the information
-carried over to the new serializer for the state is sufficient for it to 
determine whether or not it is compatible.
-This could typically contain information about the serializer's parameters or 
binary format of the serialized data;
-generally, anything that allows the new serializer to decide whether or not it 
can be used to read previous serialized
-bytes, and that it writes in the same binary format.
-
-How the serializer's configuration snapshot is written to and read from 
checkpoints is fully customizable. The below
-is the base class for all serializer configuration snapshot implementations, 
the `TypeSerializerConfigSnapshot`.
-
-{% highlight java %}
-public abstract TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
-  public abstract int getVersion();
-  public void read(DataInputView in) {...}
-  public void write(DataOutputView out) {...}
-}
-{% endhighlight %}
-
-The `read` and `write` methods define how the configuration is read from and 
written to the checkpoint. The base
-implementations contain logic to read and write the version of the 
configuration snapshot, so it should be extended and
-not completely overridden.
-
-The version of the configuration snapshot is determined through the 
`getVersion` method. Versioning for the serializer
-configuration snapshot is the means to maintain compatible configurations, as 
information included in the configuration
-may change over time. By default, configuration snapshots are only compatible 
with the current version (as returned by
-`getVersion`). To indicate that the configuration is compatible with other 
versions, override the `getCompatibleVersions`
-method to return more version values. When reading from the checkpoint, you 
can use the `getReadVersion` method to
-determine the version of the written configuration and adapt the read logic to 
the specific version.
-
-<span class="label label-danger">Attention</span> The version of the 
serializer's configuration snapshot is **not**
-related to upgrading the serializer. The exact same serializer can have 
different implementations of its
-configuration snapshot, for example when more information is added to the 
configuration to allow more comprehensive
-compatibility checks in the future.
-
-One limitation of implementing a `TypeSerializerConfigSnapshot` is that an 
empty constructor must be present. The empty
-constructor is required when reading the configuration snapshot from 
checkpoints.
-
-#### Implementing the `ensureCompatibility` method
-
-The `ensureCompatibility` method should contain logic that performs checks 
against the information about the previous
-serializer carried over via the provided `TypeSerializerConfigSnapshot`, 
basically doing one of the following:
-
-  * Check whether the serializer is compatible, while possibly reconfiguring 
itself (if required) so that it may be
-    compatible. Afterwards, acknowledge with Flink that the serializer is 
compatible.
-
-  * Acknowledge that the serializer is incompatible and that state migration 
is required before Flink can proceed with
-    using the new serializer.
-
-The above cases can be translated to code by returning one of the following 
from the `ensureCompatibility` method:
-
-  * **`CompatibilityResult.compatible()`**: This acknowledges that the new 
serializer is compatible, or has been reconfigured to
-    be compatible, and Flink can proceed with the job with the serializer as 
is.
-
-  * **`CompatibilityResult.requiresMigration()`**: This acknowledges that the 
serializer is incompatible, or cannot be
-    reconfigured to be compatible, and requires a state migration before the 
new serializer can be used. State migration
-    is performed by using the previous serializer to read the restored state 
bytes to objects, and then serialized again
-    using the new serializer.
-
-  * **`CompatibilityResult.requiresMigration(TypeDeserializer 
deserializer)`**: This acknowledgement has equivalent semantics
-    to `CompatibilityResult.requiresMigration()`, but in the case that the 
previous serializer cannot be found or loaded
-    to read the restored state bytes for the migration, a provided 
`TypeDeserializer` can be used as a fallback resort.
-
-<span class="label label-danger">Attention</span> Currently, as of Flink 1.3, 
if the result of the compatibility check
-acknowledges that state migration needs to be performed, the job simply fails 
to restore from the checkpoint as state
-migration is currently not available. The ability to migrate state will be 
introduced in future releases.
-
-### Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in 
user code
-
-Since `TypeSerializer`s and `TypeSerializerConfigSnapshot`s are written as 
part of checkpoints along with the state
-values, the availability of the classes within the classpath may affect 
restore behaviour.
-
-`TypeSerializer`s are directly written into checkpoints using Java Object 
Serialization. In the case that the new
-serializer acknowledges that it is incompatible and requires state migration, 
it will be required to be present to be
-able to read the restored state bytes. Therefore, if the original serializer 
class no longer exists or has been modified
-(resulting in a different `serialVersionUID`) as a result of a serializer 
upgrade for the state, the restore would
-not be able to proceed. The alternative to this requirement is to provide a 
fallback `TypeDeserializer` when
-acknowledging that state migration is required, using 
`CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`.
-
-The class of `TypeSerializerConfigSnapshot`s in the restored checkpoint must 
exist in the classpath, as they are
-fundamental components to compatibility checks on upgraded serializers and 
would not be able to be restored if the class
-is not present. Since configuration snapshots are written to checkpoints using 
custom serialization, the implementation
-of the class is free to be changed, as long as compatibility of the 
configuration change is handled using the versioning
-mechanisms in `TypeSerializerConfigSnapshot`.

Reply via email to