alpinegizmo commented on a change in pull request #14932:
URL: https://github.com/apache/flink/pull/14932#discussion_r582692404
##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -179,18 +190,19 @@ Some more parameters and/or defaults may be set via
`conf/flink-conf.yaml` (see
{{< top >}}
-
-## Selecting a State Backend
+## Selecting Checkpoint Storage
Flink's [checkpointing mechanism]({{< ref "docs/learn-flink/fault_tolerance"
>}}) stores consistent snapshots
of all the state in timers and stateful operators, including connectors,
windows, and any [user-defined state](state.html).
Where the checkpoints are stored (e.g., JobManager memory, file system,
database) depends on the configured
-**State Backend**.
+**Checkpoint Storage**.
-By default, state is kept in memory in the TaskManagers and checkpoints are
stored in memory in the JobManager. For proper persistence of large state,
-Flink supports various approaches for storing and checkpointing state in other
state backends. The choice of state backend can be configured via
`StreamExecutionEnvironment.setStateBackend(…)`.
+By default, checkpoints are stored in memory in the JobManager. For proper
persistence of large state,
+Flink supports various approaches for checkpointing state in other locations.
+The choice of checkpoint storag ecan be configured via
`StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)`.
+It is highly encouraged that checkpoints are stored in a highly-available
filesystem for most production deployments.
Review comment:
I don't see any reason to equivocate here.
```suggestion
It is strongly encouraged that checkpoints be stored in a highly-available
filesystem for production deployments.
```
##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -179,18 +190,19 @@ Some more parameters and/or defaults may be set via
`conf/flink-conf.yaml` (see
{{< top >}}
-
-## Selecting a State Backend
+## Selecting Checkpoint Storage
Flink's [checkpointing mechanism]({{< ref "docs/learn-flink/fault_tolerance"
>}}) stores consistent snapshots
of all the state in timers and stateful operators, including connectors,
windows, and any [user-defined state](state.html).
Where the checkpoints are stored (e.g., JobManager memory, file system,
database) depends on the configured
-**State Backend**.
+**Checkpoint Storage**.
-By default, state is kept in memory in the TaskManagers and checkpoints are
stored in memory in the JobManager. For proper persistence of large state,
-Flink supports various approaches for storing and checkpointing state in other
state backends. The choice of state backend can be configured via
`StreamExecutionEnvironment.setStateBackend(…)`.
+By default, checkpoints are stored in memory in the JobManager. For proper
persistence of large state,
+Flink supports various approaches for checkpointing state in other locations.
+The choice of checkpoint storag ecan be configured via
`StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)`.
Review comment:
```suggestion
The choice of checkpoint storage can be configured via
`StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)`.
```
##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
See [Checkpointing]({{< ref
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
configure checkpoints for your program.
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen
**Checkpoint Storage**.
Review comment:
```suggestion
When checkpointing is enabled, managed state is persisted to ensure
consistent recovery in case of failures.
Where the state is persisted during checkpointing depends on the chosen
**Checkpoint Storage**.
```
##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
See [Checkpointing]({{< ref
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
configure checkpoints for your program.
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window
operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as
part of the checkpoint acknowledgement messages to the
+JobManager, which stores it on its heap as well.
+
+The MemoryStateBackend can be configured to fail the checkpoint if it goes
over a certain size to avoid `OutOfMemoryError`'s on the JobManager. To set
this feature, users can instantiate a `JobManagerCheckpointStorage` with the
corresponding max size:
+
+```java
+new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
+```
+
+Limitations of the JobManagerCheckpointStorage:
+
+ - The size of each individual state is by default limited to 5 MB. This
value can be increased in the constructor of the JobManagerCheckpointStorage.
+ - Irrespective of the configured maximal state size, the state cannot be
larger than the akka frame size (see [Configuration]({{< ref
"docs/deployment/config" >}})).
+ - The aggregate state must fit into the JobManager memory.
+
+The JobManagerCheckpointStorage is encouraged for:
+
+ - Local development and debugging
+ - Jobs that do hold little state, such as jobs that consist only of
record-at-a-time functions (Map, FlatMap, Filter, ...). The Kafka Consumer
requires very little state.
Review comment:
Are there really production use cases where using
JobManagerCheckpointStorage makes sense? Isn't the checkpoint data lost if the
JM fails? Or does HA do something to robustly persist this data?
```suggestion
- Jobs that use very little state, such as jobs that consist only of
record-at-a-time functions (Map, FlatMap, Filter, ...). The Kafka Consumer
requires very little state.
```
##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -89,10 +147,19 @@ The checkpoint directory is not part of a public API and
can be changed in the f
state.checkpoints.dir: hdfs:///checkpoints/
```
-#### Configure for per job when constructing the state backend
+#### Configure for per job on the checkpoint configuration
+
+```java
+env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
+```
+
+#### Configure with checkpoint storage instance
+
+Alternativly, checkpoint storage can be set by specifying the desired
checkpoint storage instance which allows for setting low level configurations
such as write buffer sizes.
Review comment:
```suggestion
Alternatively, checkpoint storage can be set by specifying the desired
checkpoint storage instance which allows for setting low level configurations
such as write buffer sizes.
```
##########
File path: docs/content/docs/dev/datastream/fault-tolerance/schema_evolution.md
##########
@@ -105,15 +105,22 @@ Flink fully supports evolving schema of Avro type state,
as long as the schema c
One limitation is that Avro generated classes used as the state type cannot be
relocated or have different
namespaces when the job is restored.
-{{< hint warning >}}
-Schema evolution of keys is not supported.
-{{< /hint >}}
+## Schema Migration Limiations
-Example: RocksDB state backend relies on binary objects identity, rather than
`hashCode` method implementation. Any changes to the keys object structure
could lead to non deterministic behaviour.
+Flink's schema migration contains certain limiations to ensure corretness. For
users that need to work
Review comment:
```suggestion
Flink's schema migration has some limitations that are required to ensure
correctness. For users that need to work
```
##########
File path: docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
##########
@@ -50,6 +50,8 @@ By default, checkpointing is disabled. To enable
checkpointing, call `enableChec
Other parameters for checkpointing include:
+ - *checkpoint storage*: You can set the location where checkpoints snapshots
are made durable. By default Flink will use the JobManager's heap. For
production deployments it is recomended to instead use a durable filesystem.
See [checkpoint storage]({{< ref
"docs/ops/state/checkpoints#checkpoint-storage" >}}) for more details on the
available options for job-wide and cluster-wide configuration.
Review comment:
```suggestion
- *checkpoint storage*: You can set the location where checkpoint
snapshots are made durable. By default Flink will use the JobManager's heap.
For production deployments it is recomended to instead use a durable
filesystem. See [checkpoint storage]({{< ref
"docs/ops/state/checkpoints#checkpoint-storage" >}}) for more details on the
available options for job-wide and cluster-wide configuration.
```
##########
File path: docs/content/docs/learn-flink/fault_tolerance.md
##########
@@ -28,83 +28,108 @@ under the License.
The keyed state managed by Flink is a sort of sharded, key/value store, and
the working copy of each
item of keyed state is kept somewhere local to the taskmanager responsible for
that key. Operator
-state is also local to the machine(s) that need(s) it. Flink periodically
takes persistent snapshots
-of all the state and copies these snapshots somewhere more durable, such as a
distributed file
-system.
+state is also local to the machine(s) that need(s) it.
-In the event of the failure, Flink can restore the complete state of your
application and resume
-processing as though nothing had gone wrong.
-
-This state that Flink manages is stored in a _state backend_. Two
implementations of state backends
-are available -- one based on RocksDB, an embedded key/value store that keeps
its working state on
+This state that Flink manages is stored in a _state backend_.
+Two implementations of state backends are available -- one based on RocksDB,
an embedded key/value store that keeps its working state on
disk, and another heap-based state backend that keeps its working state in
memory, on the Java heap.
-This heap-based state backend comes in two flavors: the FsStateBackend that
persists its state
-snapshots to a distributed file system, and the MemoryStateBackend that uses
the JobManager's heap.
-
-<table class="table table-bordered">
- <thead>
- <tr class="book-hint info">
- <th class="text-left">Name</th>
- <th class="text-left">Working State</th>
- <th class="text-left">State Backup</th>
- <th class="text-left">Snapshotting</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <th class="text-left">RocksDBStateBackend</th>
- <td class="text-left">Local disk (tmp dir)</td>
- <td class="text-left">Distributed file system</td>
- <td class="text-left">Full / Incremental</td>
- </tr>
- <tr>
- <td colspan="4" class="text-left">
- <ul>
- <li>Supports state larger than available memory</li>
- <li>Rule of thumb: 10x slower than heap-based backends</li>
- </ul>
- </td>
- </tr>
- <tr>
- <th class="text-left">FsStateBackend</th>
- <td class="text-left">JVM Heap</td>
- <td class="text-left">Distributed file system</td>
- <td class="text-left">Full</td>
- </tr>
- <tr>
- <td colspan="4" class="text-left">
- <ul>
- <li>Fast, requires large heap</li>
- <li>Subject to GC</li>
- </ul>
- </td>
- </tr>
- <tr>
- <th class="text-left">MemoryStateBackend</th>
- <td class="text-left">JVM Heap</td>
- <td class="text-left">JobManager JVM Heap</td>
- <td class="text-left">Full</td>
- </tr>
- <tr>
- <td colspan="4" class="text-left">
- <ul>
- <li>Good for testing and experimentation with small state
(locally)</li>
- </ul>
- </td>
- </tr>
- </tbody>
-</table>
+
+<center>
+ <table class="table table-bordered">
+ <thead>
+ <tr class="book-hint info">
+ <th class="text-left">Name</th>
+ <th class="text-left">Working State</th>
+ <th class="text-left">Snapshotting</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <th class="text-left">EmbeddedRocksDBStateBackend</th>
+ <td class="text-left">Local disk (tmp dir)</td>
+ <td class="text-left">Full / Incremental</td>
+ </tr>
+ <tr>
+ <td colspan="4" class="text-left">
+ <ul>
+ <li>Supports state larger than available memory</li>
+ <li>Rule of thumb: 10x slower than heap-based backends</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <th class="text-left">HashMapStateBackend</th>
+ <td class="text-left">JVM Heap</td>
+ <td class="text-left">Full</td>
+ </tr>
+ <tr>
+ <td colspan="4" class="text-left">
+ <ul>
+ <li>Fast, requires large heap</li>
+ <li>Subject to GC</li>
+ </ul>
+ </td>
+ </tr>
+ </tbody>
+ </table>
+</center>
When working with state kept in a heap-based state backend, accesses and
updates involve reading and
-writing objects on the heap. But for objects kept in the
`RocksDBStateBackend`, accesses and updates
+writing objects on the heap. But for objects kept in the
`EmbeddedRocksDBStateBackend`, accesses and updates
involve serialization and deserialization, and so are much more expensive. But
the amount of state
you can have with RocksDB is limited only by the size of the local disk. Note
also that only the
-`RocksDBStateBackend` is able to do incremental snapshotting, which is a
significant benefit for
+`EmbeddedRocksDBStateBackend` is able to do incremental snapshotting, which is
a significant benefit for
applications with large amounts of slowly changing state.
-All of these state backends are able to do asynchronous snapshotting, meaning
that they can take a
+Both of these state backends are able to do asynchronous snapshotting, meaning
that they can take a
snapshot without impeding the ongoing stream processing.
+## Checkpoint Storage
+
+Flink periodically takes persistent snapshots of all the state in every
operator and copies these snapshots somewhere more durable, such as a
distributed file system. In the event of the failure, Flink can restore the
complete state of your application and resume
+processing as though nothing had gone wrong.
+
+The location where these snapshots are stored is defined via the jobs
_checkpoint storage_.
+Two implementations of checkpoint storage are available - one that persists
its state snapshots
+to a distributed file system, and another that users the JobManager's heap.
+
+<center>
+ <table class="table table-bordered">
+ <thead>
+ <tr class="book-hint info">
+ <th class="text-left">Name</th>
+ <th class="text-left">State Backup</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <th class="text-left">FileSystemStateBackend</th>
Review comment:
```suggestion
<th class="text-left">FileSystemCheckpointStorage</th>
```
##########
File path: docs/content/docs/dev/datastream/fault-tolerance/schema_evolution.md
##########
@@ -105,15 +105,22 @@ Flink fully supports evolving schema of Avro type state,
as long as the schema c
One limitation is that Avro generated classes used as the state type cannot be
relocated or have different
namespaces when the job is restored.
-{{< hint warning >}}
-Schema evolution of keys is not supported.
-{{< /hint >}}
+## Schema Migration Limiations
-Example: RocksDB state backend relies on binary objects identity, rather than
`hashCode` method implementation. Any changes to the keys object structure
could lead to non deterministic behaviour.
+Flink's schema migration contains certain limiations to ensure corretness. For
users that need to work
+around these limitations, and understand them to be safe in their specific
use-case, consider using
+a [custom serializer]({{< ref
"docs/dev/datastream/fault-tolerance/custom_serialization" >}}) or the
+[state processor api]({{< ref "docs/libs/state_processor_api" >}}).
-{{< hint warning >}}
-**Kryo** cannot be used for schema evolution.
-{{< /hint >}}
+### Schema evolution of keys is not supported.
+
+The structure of a key cannot be migrated as this may lead to
non-deterministic behavior.
+For example, if a POJO is used as a key and one field is dropped then there
may suddenly be
+multiple seperate keys that are now identical. Flink has no way to merge the
corresponding values.
Review comment:
```suggestion
multiple separate keys that are now identical. Flink has no way to merge the
corresponding values.
```
##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
See [Checkpointing]({{< ref
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
configure checkpoints for your program.
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window
operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as
part of the checkpoint acknowledgement messages to the
+JobManager, which stores it on its heap as well.
+
+The MemoryStateBackend can be configured to fail the checkpoint if it goes
over a certain size to avoid `OutOfMemoryError`'s on the JobManager. To set
this feature, users can instantiate a `JobManagerCheckpointStorage` with the
corresponding max size:
+
+```java
+new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
+```
+
+Limitations of the JobManagerCheckpointStorage:
+
+ - The size of each individual state is by default limited to 5 MB. This
value can be increased in the constructor of the JobManagerCheckpointStorage.
Review comment:
```suggestion
- The size of each individual state is by default limited to 5 MB. This
value can be increased in the constructor of the `JobManagerCheckpointStorage`.
```
##########
File path: docs/content/docs/learn-flink/fault_tolerance.md
##########
@@ -28,83 +28,108 @@ under the License.
The keyed state managed by Flink is a sort of sharded, key/value store, and
the working copy of each
item of keyed state is kept somewhere local to the taskmanager responsible for
that key. Operator
-state is also local to the machine(s) that need(s) it. Flink periodically
takes persistent snapshots
-of all the state and copies these snapshots somewhere more durable, such as a
distributed file
-system.
+state is also local to the machine(s) that need(s) it.
-In the event of the failure, Flink can restore the complete state of your
application and resume
-processing as though nothing had gone wrong.
-
-This state that Flink manages is stored in a _state backend_. Two
implementations of state backends
-are available -- one based on RocksDB, an embedded key/value store that keeps
its working state on
+This state that Flink manages is stored in a _state backend_.
+Two implementations of state backends are available -- one based on RocksDB,
an embedded key/value store that keeps its working state on
disk, and another heap-based state backend that keeps its working state in
memory, on the Java heap.
-This heap-based state backend comes in two flavors: the FsStateBackend that
persists its state
-snapshots to a distributed file system, and the MemoryStateBackend that uses
the JobManager's heap.
-
-<table class="table table-bordered">
- <thead>
- <tr class="book-hint info">
- <th class="text-left">Name</th>
- <th class="text-left">Working State</th>
- <th class="text-left">State Backup</th>
- <th class="text-left">Snapshotting</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <th class="text-left">RocksDBStateBackend</th>
- <td class="text-left">Local disk (tmp dir)</td>
- <td class="text-left">Distributed file system</td>
- <td class="text-left">Full / Incremental</td>
- </tr>
- <tr>
- <td colspan="4" class="text-left">
- <ul>
- <li>Supports state larger than available memory</li>
- <li>Rule of thumb: 10x slower than heap-based backends</li>
- </ul>
- </td>
- </tr>
- <tr>
- <th class="text-left">FsStateBackend</th>
- <td class="text-left">JVM Heap</td>
- <td class="text-left">Distributed file system</td>
- <td class="text-left">Full</td>
- </tr>
- <tr>
- <td colspan="4" class="text-left">
- <ul>
- <li>Fast, requires large heap</li>
- <li>Subject to GC</li>
- </ul>
- </td>
- </tr>
- <tr>
- <th class="text-left">MemoryStateBackend</th>
- <td class="text-left">JVM Heap</td>
- <td class="text-left">JobManager JVM Heap</td>
- <td class="text-left">Full</td>
- </tr>
- <tr>
- <td colspan="4" class="text-left">
- <ul>
- <li>Good for testing and experimentation with small state
(locally)</li>
- </ul>
- </td>
- </tr>
- </tbody>
-</table>
+
+<center>
+ <table class="table table-bordered">
+ <thead>
+ <tr class="book-hint info">
+ <th class="text-left">Name</th>
+ <th class="text-left">Working State</th>
+ <th class="text-left">Snapshotting</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <th class="text-left">EmbeddedRocksDBStateBackend</th>
+ <td class="text-left">Local disk (tmp dir)</td>
+ <td class="text-left">Full / Incremental</td>
+ </tr>
+ <tr>
+ <td colspan="4" class="text-left">
+ <ul>
+ <li>Supports state larger than available memory</li>
+ <li>Rule of thumb: 10x slower than heap-based backends</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <th class="text-left">HashMapStateBackend</th>
+ <td class="text-left">JVM Heap</td>
+ <td class="text-left">Full</td>
+ </tr>
+ <tr>
+ <td colspan="4" class="text-left">
+ <ul>
+ <li>Fast, requires large heap</li>
+ <li>Subject to GC</li>
+ </ul>
+ </td>
+ </tr>
+ </tbody>
+ </table>
+</center>
When working with state kept in a heap-based state backend, accesses and
updates involve reading and
-writing objects on the heap. But for objects kept in the
`RocksDBStateBackend`, accesses and updates
+writing objects on the heap. But for objects kept in the
`EmbeddedRocksDBStateBackend`, accesses and updates
involve serialization and deserialization, and so are much more expensive. But
the amount of state
you can have with RocksDB is limited only by the size of the local disk. Note
also that only the
-`RocksDBStateBackend` is able to do incremental snapshotting, which is a
significant benefit for
+`EmbeddedRocksDBStateBackend` is able to do incremental snapshotting, which is
a significant benefit for
applications with large amounts of slowly changing state.
-All of these state backends are able to do asynchronous snapshotting, meaning
that they can take a
+Both of these state backends are able to do asynchronous snapshotting, meaning
that they can take a
snapshot without impeding the ongoing stream processing.
+## Checkpoint Storage
+
+Flink periodically takes persistent snapshots of all the state in every
operator and copies these snapshots somewhere more durable, such as a
distributed file system. In the event of the failure, Flink can restore the
complete state of your application and resume
+processing as though nothing had gone wrong.
+
+The location where these snapshots are stored is defined via the jobs
_checkpoint storage_.
+Two implementations of checkpoint storage are available - one that persists
its state snapshots
+to a distributed file system, and another that users the JobManager's heap.
+
+<center>
+ <table class="table table-bordered">
+ <thead>
+ <tr class="book-hint info">
+ <th class="text-left">Name</th>
+ <th class="text-left">State Backup</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <th class="text-left">FileSystemStateBackend</th>
+ <td class="text-left">Distributed file system</td>
+ </tr>
+ <tr>
+ <td colspan="4" class="text-left">
+ <ul>
+ <li>Supports very large state size</li>
+ <li>Highly durable</li>
+ <li>Recomended for most production deployments</li>
Review comment:
```suggestion
<li>Recommended for production deployments</li>
```
##########
File path: docs/content/docs/dev/datastream/fault-tolerance/schema_evolution.md
##########
@@ -105,15 +105,22 @@ Flink fully supports evolving schema of Avro type state,
as long as the schema c
One limitation is that Avro generated classes used as the state type cannot be
relocated or have different
namespaces when the job is restored.
-{{< hint warning >}}
-Schema evolution of keys is not supported.
-{{< /hint >}}
+## Schema Migration Limiations
-Example: RocksDB state backend relies on binary objects identity, rather than
`hashCode` method implementation. Any changes to the keys object structure
could lead to non deterministic behaviour.
+Flink's schema migration contains certain limiations to ensure corretness. For
users that need to work
+around these limitations, and understand them to be safe in their specific
use-case, consider using
+a [custom serializer]({{< ref
"docs/dev/datastream/fault-tolerance/custom_serialization" >}}) or the
+[state processor api]({{< ref "docs/libs/state_processor_api" >}}).
-{{< hint warning >}}
-**Kryo** cannot be used for schema evolution.
-{{< /hint >}}
+### Schema evolution of keys is not supported.
+
+The structure of a key cannot be migrated as this may lead to
non-deterministic behavior.
+For example, if a POJO is used as a key and one field is dropped then there
may suddenly be
+multiple seperate keys that are now identical. Flink has no way to merge the
corresponding values.
+
+Additionally, RocksDB state backend relies on binary objects identity, rather
than `hashCode` method implementation. Any changes to the keys object structure
could lead to non-deterministic behavior.
Review comment:
```suggestion
Additionally, the RocksDB state backend relies on binary object identity,
rather than the `hashCode` method. Any change to the keys' object structure can
lead to non-deterministic behavior.
```
##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
See [Checkpointing]({{< ref
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
configure checkpoints for your program.
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the
JobManager heap.
Review comment:
```suggestion
The *JobManagerCheckpointStorage* stores checkpoint snapshots on the
JobManager heap.
```
##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
See [Checkpointing]({{< ref
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
configure checkpoints for your program.
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window
operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as
part of the checkpoint acknowledgement messages to the
+JobManager, which stores it on its heap as well.
+
+The MemoryStateBackend can be configured to fail the checkpoint if it goes
over a certain size to avoid `OutOfMemoryError`'s on the JobManager. To set
this feature, users can instantiate a `JobManagerCheckpointStorage` with the
corresponding max size:
+
+```java
+new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
+```
+
+Limitations of the JobManagerCheckpointStorage:
Review comment:
```suggestion
Limitations of the `JobManagerCheckpointStorage`:
```
##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
See [Checkpointing]({{< ref
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
configure checkpoints for your program.
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window
operators hold hash tables
Review comment:
The first part of this sentence is missing.
##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
See [Checkpointing]({{< ref
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
configure checkpoints for your program.
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window
operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as
part of the checkpoint acknowledgement messages to the
+JobManager, which stores it on its heap as well.
+
+The MemoryStateBackend can be configured to fail the checkpoint if it goes
over a certain size to avoid `OutOfMemoryError`'s on the JobManager. To set
this feature, users can instantiate a `JobManagerCheckpointStorage` with the
corresponding max size:
+
+```java
+new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
+```
+
+Limitations of the JobManagerCheckpointStorage:
+
+ - The size of each individual state is by default limited to 5 MB. This
value can be increased in the constructor of the JobManagerCheckpointStorage.
+ - Irrespective of the configured maximal state size, the state cannot be
larger than the akka frame size (see [Configuration]({{< ref
"docs/deployment/config" >}})).
Review comment:
```suggestion
- Irrespective of the configured maximal state size, the state cannot be
larger than the Akka frame size (see [Configuration]({{< ref
"docs/deployment/config" >}})).
```
##########
File path: docs/content/docs/ops/state/checkpoints.md
##########
@@ -35,6 +35,64 @@ the same semantics as a failure-free execution.
See [Checkpointing]({{< ref
"docs/dev/datastream/fault-tolerance/checkpointing" >}}) for how to enable and
configure checkpoints for your program.
+## Checkpoint Storage
+
+When checkpointing is activated, such state is persisted upon checkpoints to
guard against data loss and recover consistently.
+Where the state is persisted upon checkpoints depends on the chosen
**Checkpoint Storage**.
+
+## Available State Backends
+
+Out of the box, Flink bundles these checkpoint storage types:
+
+ - *JobManagerCheckpointStorage*
+ - *FileSystemCheckpointStorage*
+
+{{< hint info >}}
+If a checkpoint directory is configured `FileSystemCheckpointStorage` will be
used, otherwise the system will use the `JobManagerCheckpointStorage`.
+{{< /hint >}}
+
+### The JobManagerCheckpointStorage
+
+The *JobManagerCheckpointStorage* stores checkpoint snapshots on in the
JobManager heap.
+
+holds data internally as objects on the Java heap. Key/value state and window
operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as
part of the checkpoint acknowledgement messages to the
+JobManager, which stores it on its heap as well.
+
+The MemoryStateBackend can be configured to fail the checkpoint if it goes
over a certain size to avoid `OutOfMemoryError`'s on the JobManager. To set
this feature, users can instantiate a `JobManagerCheckpointStorage` with the
corresponding max size:
+
+```java
+new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
+```
+
+Limitations of the JobManagerCheckpointStorage:
+
+ - The size of each individual state is by default limited to 5 MB. This
value can be increased in the constructor of the JobManagerCheckpointStorage.
+ - Irrespective of the configured maximal state size, the state cannot be
larger than the akka frame size (see [Configuration]({{< ref
"docs/deployment/config" >}})).
+ - The aggregate state must fit into the JobManager memory.
+
+The JobManagerCheckpointStorage is encouraged for:
+
+ - Local development and debugging
+ - Jobs that do hold little state, such as jobs that consist only of
record-at-a-time functions (Map, FlatMap, Filter, ...). The Kafka Consumer
requires very little state.
+
+### The FileSystemCheckpointStorage
+
+The *FileSystemCheckpointStorage* is configured with a file system URL (type,
address, path), such as "hdfs://namenode:40010/flink/checkpoints" or
"file:///data/flink/checkpoints".
+
+Upon checkpointing, it writes state snapshots into files in the configured
file system and directory. Minimal metadata is stored in the JobManager's
memory (or, in high-availability mode, in the metadata checkpoint).
+
+If a checkpoint directory is specified, `FileSystemCheckpointStorage` will be
used to persist checkpoint snapshots.
+
+The FileSystemCheckpointStorage is encouraged for:
Review comment:
```suggestion
The `FileSystemCheckpointStorage` is encouraged for:
```
##########
File path: docs/content/docs/ops/state/savepoints.md
##########
@@ -125,7 +125,7 @@ because the injected path entropy spreads the files over
many directories. Lacki
Unlike savepoints, checkpoints cannot generally be moved to a different
location, because checkpoints may include some absolute path references.
-If you use the `MemoryStateBackend`, metadata *and* savepoint state will be
stored in the `_metadata` file, so don't be confused by the absence of
additional data files.
+If you use the `JobManagerCheckpointStorage`, metadata *and* savepoint state
will be stored in the `_metadata` file, so don't be confused by the absence of
additional data files.
Review comment:
```suggestion
If you use `JobManagerCheckpointStorage`, metadata *and* savepoint state
will be stored in the `_metadata` file, so don't be confused by the absence of
additional data files.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]