alpinegizmo commented on a change in pull request #14932:
URL: https://github.com/apache/flink/pull/14932#discussion_r582692404



##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -179,18 +190,19 @@ Some more parameters and/or defaults may be set via 
`conf/flink-conf.yaml` (see
 
 {{< top >}}
 
-
-## Selecting a State Backend
+## Selecting Checkpoint Storage
 
 Flink's [checkpointing mechanism]({{< ref "docs/learn-flink/fault_tolerance" 
>}}) stores consistent snapshots
 of all the state in timers and stateful operators, including connectors, 
windows, and any [user-defined state](state.html).
 Where the checkpoints are stored (e.g., JobManager memory, file system, 
database) depends on the configured
-**State Backend**. 
+**Checkpoint Storage**. 
 
-By default, state is kept in memory in the TaskManagers and checkpoints are 
stored in memory in the JobManager. For proper persistence of large state,
-Flink supports various approaches for storing and checkpointing state in other 
state backends. The choice of state backend can be configured via 
`StreamExecutionEnvironment.setStateBackend(…)`.
+By default, checkpoints are stored in memory in the JobManager. For proper 
persistence of large state,
+Flink supports various approaches for checkpointing state in other locations. 
+The choice of checkpoint storag ecan be configured via 
`StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)`.
+It is highly encouraged that checkpoints are stored in a highly-available 
filesystem for most production deployments. 

Review comment:
       I don't see any reason to equivocate here.
   
   ```suggestion
   It is strongly encouraged that checkpoints be stored in a highly-available 
filesystem for production deployments. 
   ```

##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -179,18 +190,19 @@ Some more parameters and/or defaults may be set via 
`conf/flink-conf.yaml` (see
 
 {{< top >}}
 
-
-## Selecting a State Backend
+## Selecting Checkpoint Storage
 
 Flink's [checkpointing mechanism]({{< ref "docs/learn-flink/fault_tolerance" 
>}}) stores consistent snapshots
 of all the state in timers and stateful operators, including connectors, 
windows, and any [user-defined state](state.html).
 Where the checkpoints are stored (e.g., JobManager memory, file system, 
database) depends on the configured
-**State Backend**. 
+**Checkpoint Storage**. 
 
-By default, state is kept in memory in the TaskManagers and checkpoints are 
stored in memory in the JobManager. For proper persistence of large state,
-Flink supports various approaches for storing and checkpointing state in other 
state backends. The choice of state backend can be configured via 
`StreamExecutionEnvironment.setStateBackend(…)`.
+By default, checkpoints are stored in memory in the JobManager. For proper 
persistence of large state,
+Flink supports various approaches for checkpointing state in other locations. 
+The choice of checkpoint storag ecan be configured via 
`StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)`.

Review comment:
       ```suggestion
   The choice of checkpoint storage can be configured via 
`StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)`.
   ```

##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
 See [Checkpointing]({{< ref 
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
 configure checkpoints for your program.
 
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to 
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen 
**Checkpoint Storage**.

Review comment:
       ```suggestion
   When checkpointing is enabled, managed state is persisted to ensure 
consistent recovery in case of failures.
   Where the state is persisted during checkpointing depends on the chosen 
**Checkpoint Storage**.
   ```

##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
 See [Checkpointing]({{< ref 
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
 configure checkpoints for your program.
 
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to 
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen 
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be 
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the 
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window 
operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as 
part of the checkpoint acknowledgement messages to the
+JobManager, which stores it on its heap as well.
+
+The MemoryStateBackend can be configured to fail the checkpoint if it goes 
over a certain size to avoid `OutOfMemoryError`'s on the JobManager. To set 
this feature, users can instantiate a `JobManagerCheckpointStorage` with the 
corresponding max size:
+
+```java
+new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
+```
+
+Limitations of the JobManagerCheckpointStorage:
+
+  - The size of each individual state is by default limited to 5 MB. This 
value can be increased in the constructor of the JobManagerCheckpointStorage.
+  - Irrespective of the configured maximal state size, the state cannot be 
larger than the akka frame size (see [Configuration]({{< ref 
"docs/deployment/config" >}})).
+  - The aggregate state must fit into the JobManager memory.
+
+The JobManagerCheckpointStorage is encouraged for:
+
+  - Local development and debugging
+  - Jobs that do hold little state, such as jobs that consist only of 
record-at-a-time functions (Map, FlatMap, Filter, ...). The Kafka Consumer 
requires very little state.

Review comment:
       Are there really production use cases where using 
JobManagerCheckpointStorage makes sense? Isn't the checkpoint data lost if the 
JM fails? Or does HA do something to robustly persist this data?
   
   ```suggestion
     - Jobs that use very little state, such as jobs that consist only of 
record-at-a-time functions (Map, FlatMap, Filter, ...). The Kafka Consumer 
requires very little state.
   ```

##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -89,10 +147,19 @@ The checkpoint directory is not part of a public API and 
can be changed in the f
 state.checkpoints.dir: hdfs:///checkpoints/
 ```
 
-#### Configure for per job when constructing the state backend
+#### Configure for per job on the checkpoint configuration
+
+```java
+env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
+```
+
+#### Configure with checkpoint storage instance
+
+Alternativly, checkpoint storage can be set by specifying the desired 
checkpoint storage instance which allows for setting low level configurations 
such as write buffer sizes. 

Review comment:
       ```suggestion
   Alternatively, checkpoint storage can be set by specifying the desired 
checkpoint storage instance which allows for setting low level configurations 
such as write buffer sizes. 
   ```

##########
File path: docs/content/docs/dev/datastream/fault-tolerance/schema_evolution.md
##########
@@ -105,15 +105,22 @@ Flink fully supports evolving schema of Avro type state, 
as long as the schema c
 One limitation is that Avro generated classes used as the state type cannot be 
relocated or have different
 namespaces when the job is restored.
 
-{{< hint warning >}}
-Schema evolution of keys is not supported.
-{{< /hint >}}
+## Schema Migration Limiations
 
-Example: RocksDB state backend relies on binary objects identity, rather than 
`hashCode` method implementation. Any changes to the keys object structure 
could lead to non deterministic behaviour.  
+Flink's schema migration contains certain limiations to ensure corretness. For 
users that need to work

Review comment:
       ```suggestion
   Flink's schema migration has some limitations that are required to ensure 
correctness. For users that need to work
   ```

##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -50,6 +50,8 @@ By default, checkpointing is disabled. To enable 
checkpointing, call `enableChec
 
 Other parameters for checkpointing include:
 
+  - *checkpoint storage*: You can set the location where checkpoints snapshots 
are made durable. By default Flink will use the JobManager's heap. For 
production deployments it is recomended to instead use a durable filesystem. 
See [checkpoint storage]({{< ref 
"docs/ops/state/checkpoints#checkpoint-storage" >}}) for more details on the 
available options for job-wide and cluster-wide configuration.

Review comment:
       ```suggestion
     - *checkpoint storage*: You can set the location where checkpoint 
snapshots are made durable. By default Flink will use the JobManager's heap. 
For production deployments it is recomended to instead use a durable 
filesystem. See [checkpoint storage]({{< ref 
"docs/ops/state/checkpoints#checkpoint-storage" >}}) for more details on the 
available options for job-wide and cluster-wide configuration.
   ```

##########
File path: docs/content/docs/learn-flink/fault_tolerance.md
##########
@@ -28,83 +28,108 @@ under the License.
 
 The keyed state managed by Flink is a sort of sharded, key/value store, and 
the working copy of each
 item of keyed state is kept somewhere local to the taskmanager responsible for 
that key. Operator
-state is also local to the machine(s) that need(s) it. Flink periodically 
takes persistent snapshots
-of all the state and copies these snapshots somewhere more durable, such as a 
distributed file
-system.
+state is also local to the machine(s) that need(s) it.
 
-In the event of the failure, Flink can restore the complete state of your 
application and resume
-processing as though nothing had gone wrong.
-
-This state that Flink manages is stored in a _state backend_. Two 
implementations of state backends
-are available -- one based on RocksDB, an embedded key/value store that keeps 
its working state on
+This state that Flink manages is stored in a _state backend_. 
+Two implementations of state backends are available -- one based on RocksDB, 
an embedded key/value store that keeps its working state on
 disk, and another heap-based state backend that keeps its working state in 
memory, on the Java heap.
-This heap-based state backend comes in two flavors: the FsStateBackend that 
persists its state
-snapshots to a distributed file system, and the MemoryStateBackend that uses 
the JobManager's heap.
-
-<table class="table table-bordered">
-  <thead>
-    <tr class="book-hint info">
-      <th class="text-left">Name</th>
-      <th class="text-left">Working State</th>
-      <th class="text-left">State Backup</th>
-      <th class="text-left">Snapshotting</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-      <th class="text-left">RocksDBStateBackend</th>
-      <td class="text-left">Local disk (tmp dir)</td>
-      <td class="text-left">Distributed file system</td>
-      <td class="text-left">Full / Incremental</td>
-    </tr>
-    <tr>
-      <td colspan="4" class="text-left">
-        <ul>
-          <li>Supports state larger than available memory</li>
-          <li>Rule of thumb: 10x slower than heap-based backends</li>
-        </ul>
-      </td>
-    </tr>
-    <tr>
-      <th class="text-left">FsStateBackend</th>
-      <td class="text-left">JVM Heap</td>
-      <td class="text-left">Distributed file system</td>
-      <td class="text-left">Full</td>
-    </tr>
-    <tr>
-      <td colspan="4" class="text-left">
-        <ul>
-          <li>Fast, requires large heap</li>
-          <li>Subject to GC</li>
-        </ul>
-      </td>
-    </tr>
-    <tr>
-      <th class="text-left">MemoryStateBackend</th>
-      <td class="text-left">JVM Heap</td>
-      <td class="text-left">JobManager JVM Heap</td>
-      <td class="text-left">Full</td>
-    </tr>
-    <tr>
-      <td colspan="4" class="text-left">
-        <ul>
-          <li>Good for testing and experimentation with small state 
(locally)</li>
-        </ul>
-      </td>
-    </tr>
-  </tbody>
-</table>
+
+<center>
+  <table class="table table-bordered">
+    <thead>
+      <tr class="book-hint info">
+        <th class="text-left">Name</th>
+        <th class="text-left">Working State</th>
+        <th class="text-left">Snapshotting</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr>
+        <th class="text-left">EmbeddedRocksDBStateBackend</th>
+        <td class="text-left">Local disk (tmp dir)</td>
+        <td class="text-left">Full / Incremental</td>
+      </tr>
+      <tr>
+        <td colspan="4" class="text-left">
+          <ul>
+            <li>Supports state larger than available memory</li>
+            <li>Rule of thumb: 10x slower than heap-based backends</li>
+          </ul>
+        </td>
+      </tr>
+      <tr>
+        <th class="text-left">HashMapStateBackend</th>
+        <td class="text-left">JVM Heap</td>
+        <td class="text-left">Full</td>
+      </tr>
+      <tr>
+        <td colspan="4" class="text-left">
+          <ul>
+            <li>Fast, requires large heap</li>
+            <li>Subject to GC</li>
+          </ul>
+        </td>
+      </tr>
+    </tbody>
+  </table>
+</center>
 
 When working with state kept in a heap-based state backend, accesses and 
updates involve reading and
-writing objects on the heap. But for objects kept in the 
`RocksDBStateBackend`, accesses and updates
+writing objects on the heap. But for objects kept in the 
`EmbeddedRocksDBStateBackend`, accesses and updates
 involve serialization and deserialization, and so are much more expensive. But 
the amount of state
 you can have with RocksDB is limited only by the size of the local disk. Note 
also that only the
-`RocksDBStateBackend` is able to do incremental snapshotting, which is a 
significant benefit for
+`EmbeddedRocksDBStateBackend` is able to do incremental snapshotting, which is 
a significant benefit for
 applications with large amounts of slowly changing state.
 
-All of these state backends are able to do asynchronous snapshotting, meaning 
that they can take a
+Both of these state backends are able to do asynchronous snapshotting, meaning 
that they can take a
 snapshot without impeding the ongoing stream processing.
 
+## Checkpoint Storage
+
+Flink periodically takes persistent snapshots of all the state in every 
operator and copies these snapshots somewhere more durable, such as a 
distributed file system. In the event of the failure, Flink can restore the 
complete state of your application and resume
+processing as though nothing had gone wrong.
+
+The location where these snapshots are stored is defined via the jobs 
_checkpoint storage_.
+Two implementations of checkpoint storage are available - one that persists 
its state snapshots
+to a distributed file system, and another that users the JobManager's heap. 
+
+<center>
+  <table class="table table-bordered">
+    <thead>
+      <tr class="book-hint info">
+        <th class="text-left">Name</th>
+        <th class="text-left">State Backup</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr>
+        <th class="text-left">FileSystemStateBackend</th>

Review comment:
       ```suggestion
           <th class="text-left">FileSystemCheckpointStorage</th>
   ```

##########
File path: docs/content/docs/dev/datastream/fault-tolerance/schema_evolution.md
##########
@@ -105,15 +105,22 @@ Flink fully supports evolving schema of Avro type state, 
as long as the schema c
 One limitation is that Avro generated classes used as the state type cannot be 
relocated or have different
 namespaces when the job is restored.
 
-{{< hint warning >}}
-Schema evolution of keys is not supported.
-{{< /hint >}}
+## Schema Migration Limiations
 
-Example: RocksDB state backend relies on binary objects identity, rather than 
`hashCode` method implementation. Any changes to the keys object structure 
could lead to non deterministic behaviour.  
+Flink's schema migration contains certain limiations to ensure corretness. For 
users that need to work
+around these limitations, and understand them to be safe in their specific 
use-case, consider using
+a [custom serializer]({{< ref 
"docs/dev/datastream/fault-tolerance/custom_serialization" >}}) or the
+[state processor api]({{< ref "docs/libs/state_processor_api" >}}).
 
-{{< hint warning >}}
-**Kryo** cannot be used for schema evolution.  
-{{< /hint >}}
+### Schema evolution of keys is not supported.
+
+The structure of a key cannot be migrated as this may lead to 
non-deterministic behavior. 
+For example, if a POJO is used as a key and one field is dropped then there 
may suddenly be 
+multiple seperate keys that are now identical. Flink has no way to merge the 
corresponding values. 

Review comment:
       ```suggestion
   multiple separate keys that are now identical. Flink has no way to merge the 
corresponding values. 
   ```

##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
 See [Checkpointing]({{< ref 
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
 configure checkpoints for your program.
 
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to 
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen 
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be 
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the 
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window 
operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as 
part of the checkpoint acknowledgement messages to the
+JobManager, which stores it on its heap as well.
+
+The MemoryStateBackend can be configured to fail the checkpoint if it goes 
over a certain size to avoid `OutOfMemoryError`'s on the JobManager. To set 
this feature, users can instantiate a `JobManagerCheckpointStorage` with the 
corresponding max size:
+
+```java
+new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
+```
+
+Limitations of the JobManagerCheckpointStorage:
+
+  - The size of each individual state is by default limited to 5 MB. This 
value can be increased in the constructor of the JobManagerCheckpointStorage.

Review comment:
       ```suggestion
     - The size of each individual state is by default limited to 5 MB. This 
value can be increased in the constructor of the `JobManagerCheckpointStorage`.
   ```

##########
File path: docs/content/docs/learn-flink/fault_tolerance.md
##########
@@ -28,83 +28,108 @@ under the License.
 
 The keyed state managed by Flink is a sort of sharded, key/value store, and 
the working copy of each
 item of keyed state is kept somewhere local to the taskmanager responsible for 
that key. Operator
-state is also local to the machine(s) that need(s) it. Flink periodically 
takes persistent snapshots
-of all the state and copies these snapshots somewhere more durable, such as a 
distributed file
-system.
+state is also local to the machine(s) that need(s) it.
 
-In the event of the failure, Flink can restore the complete state of your 
application and resume
-processing as though nothing had gone wrong.
-
-This state that Flink manages is stored in a _state backend_. Two 
implementations of state backends
-are available -- one based on RocksDB, an embedded key/value store that keeps 
its working state on
+This state that Flink manages is stored in a _state backend_. 
+Two implementations of state backends are available -- one based on RocksDB, 
an embedded key/value store that keeps its working state on
 disk, and another heap-based state backend that keeps its working state in 
memory, on the Java heap.
-This heap-based state backend comes in two flavors: the FsStateBackend that 
persists its state
-snapshots to a distributed file system, and the MemoryStateBackend that uses 
the JobManager's heap.
-
-<table class="table table-bordered">
-  <thead>
-    <tr class="book-hint info">
-      <th class="text-left">Name</th>
-      <th class="text-left">Working State</th>
-      <th class="text-left">State Backup</th>
-      <th class="text-left">Snapshotting</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-      <th class="text-left">RocksDBStateBackend</th>
-      <td class="text-left">Local disk (tmp dir)</td>
-      <td class="text-left">Distributed file system</td>
-      <td class="text-left">Full / Incremental</td>
-    </tr>
-    <tr>
-      <td colspan="4" class="text-left">
-        <ul>
-          <li>Supports state larger than available memory</li>
-          <li>Rule of thumb: 10x slower than heap-based backends</li>
-        </ul>
-      </td>
-    </tr>
-    <tr>
-      <th class="text-left">FsStateBackend</th>
-      <td class="text-left">JVM Heap</td>
-      <td class="text-left">Distributed file system</td>
-      <td class="text-left">Full</td>
-    </tr>
-    <tr>
-      <td colspan="4" class="text-left">
-        <ul>
-          <li>Fast, requires large heap</li>
-          <li>Subject to GC</li>
-        </ul>
-      </td>
-    </tr>
-    <tr>
-      <th class="text-left">MemoryStateBackend</th>
-      <td class="text-left">JVM Heap</td>
-      <td class="text-left">JobManager JVM Heap</td>
-      <td class="text-left">Full</td>
-    </tr>
-    <tr>
-      <td colspan="4" class="text-left">
-        <ul>
-          <li>Good for testing and experimentation with small state 
(locally)</li>
-        </ul>
-      </td>
-    </tr>
-  </tbody>
-</table>
+
+<center>
+  <table class="table table-bordered">
+    <thead>
+      <tr class="book-hint info">
+        <th class="text-left">Name</th>
+        <th class="text-left">Working State</th>
+        <th class="text-left">Snapshotting</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr>
+        <th class="text-left">EmbeddedRocksDBStateBackend</th>
+        <td class="text-left">Local disk (tmp dir)</td>
+        <td class="text-left">Full / Incremental</td>
+      </tr>
+      <tr>
+        <td colspan="4" class="text-left">
+          <ul>
+            <li>Supports state larger than available memory</li>
+            <li>Rule of thumb: 10x slower than heap-based backends</li>
+          </ul>
+        </td>
+      </tr>
+      <tr>
+        <th class="text-left">HashMapStateBackend</th>
+        <td class="text-left">JVM Heap</td>
+        <td class="text-left">Full</td>
+      </tr>
+      <tr>
+        <td colspan="4" class="text-left">
+          <ul>
+            <li>Fast, requires large heap</li>
+            <li>Subject to GC</li>
+          </ul>
+        </td>
+      </tr>
+    </tbody>
+  </table>
+</center>
 
 When working with state kept in a heap-based state backend, accesses and 
updates involve reading and
-writing objects on the heap. But for objects kept in the 
`RocksDBStateBackend`, accesses and updates
+writing objects on the heap. But for objects kept in the 
`EmbeddedRocksDBStateBackend`, accesses and updates
 involve serialization and deserialization, and so are much more expensive. But 
the amount of state
 you can have with RocksDB is limited only by the size of the local disk. Note 
also that only the
-`RocksDBStateBackend` is able to do incremental snapshotting, which is a 
significant benefit for
+`EmbeddedRocksDBStateBackend` is able to do incremental snapshotting, which is 
a significant benefit for
 applications with large amounts of slowly changing state.
 
-All of these state backends are able to do asynchronous snapshotting, meaning 
that they can take a
+Both of these state backends are able to do asynchronous snapshotting, meaning 
that they can take a
 snapshot without impeding the ongoing stream processing.
 
+## Checkpoint Storage
+
+Flink periodically takes persistent snapshots of all the state in every 
operator and copies these snapshots somewhere more durable, such as a 
distributed file system. In the event of the failure, Flink can restore the 
complete state of your application and resume
+processing as though nothing had gone wrong.
+
+The location where these snapshots are stored is defined via the jobs 
_checkpoint storage_.
+Two implementations of checkpoint storage are available - one that persists 
its state snapshots
+to a distributed file system, and another that users the JobManager's heap. 
+
+<center>
+  <table class="table table-bordered">
+    <thead>
+      <tr class="book-hint info">
+        <th class="text-left">Name</th>
+        <th class="text-left">State Backup</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr>
+        <th class="text-left">FileSystemStateBackend</th>
+        <td class="text-left">Distributed file system</td>
+      </tr>
+      <tr>
+        <td colspan="4" class="text-left">
+          <ul>
+            <li>Supports very large state size</li>
+            <li>Highly durable</li>
+            <li>Recomended for most production deployments</li>

Review comment:
       ```suggestion
               <li>Recommended for production deployments</li>
   ```

##########
File path: docs/content/docs/dev/datastream/fault-tolerance/schema_evolution.md
##########
@@ -105,15 +105,22 @@ Flink fully supports evolving schema of Avro type state, 
as long as the schema c
 One limitation is that Avro generated classes used as the state type cannot be 
relocated or have different
 namespaces when the job is restored.
 
-{{< hint warning >}}
-Schema evolution of keys is not supported.
-{{< /hint >}}
+## Schema Migration Limiations
 
-Example: RocksDB state backend relies on binary objects identity, rather than 
`hashCode` method implementation. Any changes to the keys object structure 
could lead to non deterministic behaviour.  
+Flink's schema migration contains certain limiations to ensure corretness. For 
users that need to work
+around these limitations, and understand them to be safe in their specific 
use-case, consider using
+a [custom serializer]({{< ref 
"docs/dev/datastream/fault-tolerance/custom_serialization" >}}) or the
+[state processor api]({{< ref "docs/libs/state_processor_api" >}}).
 
-{{< hint warning >}}
-**Kryo** cannot be used for schema evolution.  
-{{< /hint >}}
+### Schema evolution of keys is not supported.
+
+The structure of a key cannot be migrated as this may lead to 
non-deterministic behavior. 
+For example, if a POJO is used as a key and one field is dropped then there 
may suddenly be 
+multiple seperate keys that are now identical. Flink has no way to merge the 
corresponding values. 
+
+Additionally, RocksDB state backend relies on binary objects identity, rather 
than `hashCode` method implementation. Any changes to the keys object structure 
could lead to non-deterministic behavior.  

Review comment:
       ```suggestion
   Additionally, the RocksDB state backend relies on binary object identity, 
rather than the `hashCode` method. Any change to the keys' object structure can 
lead to non-deterministic behavior.  
   ```

##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
 See [Checkpointing]({{< ref 
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
 configure checkpoints for your program.
 
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to 
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen 
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be 
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the 
JobManager heap.

Review comment:
       ```suggestion
   The *JobManagerCheckpointStorage* stores checkpoint snapshots on the 
JobManager heap.
   ```

##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
 See [Checkpointing]({{< ref 
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
 configure checkpoints for your program.
 
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to 
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen 
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be 
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the 
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window 
operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as 
part of the checkpoint acknowledgement messages to the
+JobManager, which stores it on its heap as well.
+
+The MemoryStateBackend can be configured to fail the checkpoint if it goes 
over a certain size to avoid `OutOfMemoryError`'s on the JobManager. To set 
this feature, users can instantiate a `JobManagerCheckpointStorage` with the 
corresponding max size:
+
+```java
+new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
+```
+
+Limitations of the JobManagerCheckpointStorage:

Review comment:
       ```suggestion
   Limitations of the `JobManagerCheckpointStorage`:
   ```

##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
 See [Checkpointing]({{< ref 
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
 configure checkpoints for your program.
 
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to 
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen 
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be 
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the 
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window 
operators hold hash tables

Review comment:
       The first part of this sentence is missing.

##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
 See [Checkpointing]({{< ref 
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
 configure checkpoints for your program.
 
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to 
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen 
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be 
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the 
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window 
operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as 
part of the checkpoint acknowledgement messages to the
+JobManager, which stores it on its heap as well.
+
+The MemoryStateBackend can be configured to fail the checkpoint if it goes 
over a certain size to avoid `OutOfMemoryError`'s on the JobManager. To set 
this feature, users can instantiate a `JobManagerCheckpointStorage` with the 
corresponding max size:
+
+```java
+new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
+```
+
+Limitations of the JobManagerCheckpointStorage:
+
+  - The size of each individual state is by default limited to 5 MB. This 
value can be increased in the constructor of the JobManagerCheckpointStorage.
+  - Irrespective of the configured maximal state size, the state cannot be 
larger than the akka frame size (see [Configuration]({{< ref 
"docs/deployment/config" >}})).

Review comment:
       ```suggestion
     - Irrespective of the configured maximal state size, the state cannot be 
larger than the Akka frame size (see [Configuration]({{< ref 
"docs/deployment/config" >}})).
   ```

##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
 See [Checkpointing]({{< ref 
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
 configure checkpoints for your program.
 
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to 
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen 
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be 
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the 
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window 
operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as 
part of the checkpoint acknowledgement messages to the
+JobManager, which stores it on its heap as well.
+
+The MemoryStateBackend can be configured to fail the checkpoint if it goes 
over a certain size to avoid `OutOfMemoryError`'s on the JobManager. To set 
this feature, users can instantiate a `JobManagerCheckpointStorage` with the 
corresponding max size:
+
+```java
+new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
+```
+
+Limitations of the JobManagerCheckpointStorage:
+
+  - The size of each individual state is by default limited to 5 MB. This 
value can be increased in the constructor of the JobManagerCheckpointStorage.
+  - Irrespective of the configured maximal state size, the state cannot be 
larger than the akka frame size (see [Configuration]({{< ref 
"docs/deployment/config" >}})).
+  - The aggregate state must fit into the JobManager memory.
+
+The JobManagerCheckpointStorage is encouraged for:
+
+  - Local development and debugging
+  - Jobs that do hold little state, such as jobs that consist only of 
record-at-a-time functions (Map, FlatMap, Filter, ...). The Kafka Consumer 
requires very little state.
+
+### The FileSystemCheckpointStorage
+
+The *FileSystemCheckpointStorage* is configured with a file system URL (type, 
address, path), such as "hdfs://namenode:40010/flink/checkpoints" or 
"file:///data/flink/checkpoints".
+
+Upon checkpointing, it writes state snapshots into files in the configured 
file system and directory. Minimal metadata is stored in the JobManager's 
memory (or, in high-availability mode, in the metadata checkpoint).
+
+If a checkpoint directory is specified, `FileSystemCheckpointStorage` will be 
used to persist checkpoint snapshots. 
+
+The FileSystemCheckpointStorage is encouraged for:

Review comment:
       ```suggestion
   The `FileSystemCheckpointStorage` is encouraged for:
   ```

##########
File path: docs/content/docs/ops/state/savepoints.md
##########
@@ -125,7 +125,7 @@ because the injected path entropy spreads the files over 
many directories. Lacki
 
 Unlike savepoints, checkpoints cannot generally be moved to a different 
location, because checkpoints may include some absolute path references.
 
-If you use the `MemoryStateBackend`, metadata *and* savepoint state will be 
stored in the `_metadata` file, so don't be confused by the absence of 
additional data files.
+If you use the `JobManagerCheckpointStorage`, metadata *and* savepoint state 
will be stored in the `_metadata` file, so don't be confused by the absence of 
additional data files.

Review comment:
       ```suggestion
   If you use `JobManagerCheckpointStorage`, metadata *and* savepoint state 
will be stored in the `_metadata` file, so don't be confused by the absence of 
additional data files.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to