curcur commented on a change in pull request #18431:
URL: https://github.com/apache/flink/pull/18431#discussion_r790412520



##########
File path: docs/content/docs/ops/state/state_backends.md
##########
@@ -325,6 +325,126 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs

Review comment:
       Usually, if we do not have a Chinese corresponding version, we will copy 
the English version to the Chinese Version and open a ticket there?
   
   The ticket can be grabbed by anyone.

##########
File path: docs/content/docs/ops/state/state_backends.md
##########
@@ -325,6 +325,126 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs
+
+// todo: mention in [large state tuning]({{< ref 
"docs/ops/state/large_state_tuning" >}})? or 1.16?
+
+{{< hint warning >}} The feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time, and therefore 
end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by 
Asynchronous snapshots
+3. Snapshot upload time (asynchronous phase)
+
+The latter (upload time) can be decreased by [Incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}). However,
+even with Incremental checkpoints, large deployments tend to have at least one 
task in every checkpoint that uploads a
+lot of data (e.g. after compaction).

Review comment:
       1. Add some context here:
   Current Incremental Checkpoints depend on the implementation of different 
types of state backends. For example, for rocksdb, compaction happens when ... 
   
   2. Explain a bit why compaction is bad
   Compaction may cause more data to be uploaded, and more time to upload....

##########
File path: docs/content/docs/ops/state/state_backends.md
##########
@@ -325,6 +325,126 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs
+
+// todo: mention in [large state tuning]({{< ref 
"docs/ops/state/large_state_tuning" >}})? or 1.16?
+
+{{< hint warning >}} The feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time, and therefore 
end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by 
Asynchronous snapshots
+3. Snapshot upload time (asynchronous phase)
+
+The latter (upload time) can be decreased by [Incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}). However,
+even with Incremental checkpoints, large deployments tend to have at least one 
task in every checkpoint that uploads a
+lot of data (e.g. after compaction).
+
+With Changelog enabled, Flink uploads state changes continuously, forming a 
changelog. On checkpoint, only the relevant
+part of this changelog needs to be uploaded. Independently, configured state 
backend is checkpointed in the
+background periodically. Upon successful upload, changelog is truncated.
+
+As a result, asynchronous phase is reduced, as well as synchronous phase (in 
particular, long-tail).
+
+On the flip side, resource usage is higher:
+
+- more files are created on DFS
+- more IO bandwidth is used to upload

Review comment:
       upload state change/changelogs

##########
File path: docs/content/docs/ops/state/state_backends.md
##########
@@ -325,6 +325,126 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs
+
+// todo: mention in [large state tuning]({{< ref 
"docs/ops/state/large_state_tuning" >}})? or 1.16?
+
+{{< hint warning >}} The feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time, and therefore 
end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by 
Asynchronous snapshots
+3. Snapshot upload time (asynchronous phase)
+
+The latter (upload time) can be decreased by [Incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}). However,
+even with Incremental checkpoints, large deployments tend to have at least one 
task in every checkpoint that uploads a
+lot of data (e.g. after compaction).
+
+With Changelog enabled, Flink uploads state changes continuously, forming a 
changelog. On checkpoint, only the relevant
+part of this changelog needs to be uploaded. Independently, configured state 
backend is checkpointed in the
+background periodically. Upon successful upload, changelog is truncated.
+
+As a result, asynchronous phase is reduced, as well as synchronous phase (in 
particular, long-tail).

Review comment:
       "as well as synchronous phase (in particular, long-tail)"
   
   Could you explain a bit more. Can not infer directly from what stated above.

##########
File path: docs/content/docs/ops/state/state_backends.md
##########
@@ -325,6 +325,126 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs
+
+// todo: mention in [large state tuning]({{< ref 
"docs/ops/state/large_state_tuning" >}})? or 1.16?
+
+{{< hint warning >}} The feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time, and therefore 
end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by 
Asynchronous snapshots
+3. Snapshot upload time (asynchronous phase)
+
+The latter (upload time) can be decreased by [Incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}). However,

Review comment:
       "latter: denoting the second or second mentioned of two people or 
things."

##########
File path: docs/content/docs/ops/state/state_backends.md
##########
@@ -325,6 +325,126 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs
+
+// todo: mention in [large state tuning]({{< ref 
"docs/ops/state/large_state_tuning" >}})? or 1.16?
+
+{{< hint warning >}} The feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time, and therefore 
end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by 
Asynchronous snapshots
+3. Snapshot upload time (asynchronous phase)
+
+The latter (upload time) can be decreased by [Incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}). However,
+even with Incremental checkpoints, large deployments tend to have at least one 
task in every checkpoint that uploads a
+lot of data (e.g. after compaction).
+
+With Changelog enabled, Flink uploads state changes continuously, forming a 
changelog. On checkpoint, only the relevant
+part of this changelog needs to be uploaded. Independently, configured state 
backend is checkpointed in the

Review comment:
       "is checkpointed" => "is snapshotted"
   
   It may confuse people with the normal Flink checkpointing procedure.

##########
File path: docs/content/docs/ops/state/state_backends.md
##########
@@ -325,6 +325,126 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs
+
+// todo: mention in [large state tuning]({{< ref 
"docs/ops/state/large_state_tuning" >}})? or 1.16?
+
+{{< hint warning >}} The feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time, and therefore 
end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by 
Asynchronous snapshots
+3. Snapshot upload time (asynchronous phase)
+
+The latter (upload time) can be decreased by [Incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}). However,
+even with Incremental checkpoints, large deployments tend to have at least one 
task in every checkpoint that uploads a
+lot of data (e.g. after compaction).
+
+With Changelog enabled, Flink uploads state changes continuously, forming a 
changelog. On checkpoint, only the relevant
+part of this changelog needs to be uploaded. Independently, configured state 
backend is checkpointed in the
+background periodically. Upon successful upload, changelog is truncated.
+
+As a result, asynchronous phase is reduced, as well as synchronous phase (in 
particular, long-tail).
+
+On the flip side, resource usage is higher:
+
+- more files are created on DFS
+- more IO bandwidth is used to upload
+- more CPU used to serialize state changes
+- more memory used by Task Managers to buffer state changes
+- todo: more details after testing, maybe link to blogpost
+
+Recovery time is another thing to consider. Depending on the 
`state.backend.changelog.periodic-materialize.interval`,
+changelog can become lengthy and replaying it may take more time. However, 
recovery time combined with checkpoint
+duration will likely be still lower than in non-changelog setup, providing 
lower end-to-end latency even in failover
+case.
+
+For more details, see 
[FLIP-158](https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints).
+
+### Installation
+
+Changelog jars are included into the standard Flink distribution.
+
+Please make sure to [add]({{< ref "docs/deployment/filesystems/overview" >}}) 
the necessary filesystem plugins.
+
+### Configuration
+
+An example configuration in yaml:
+```yaml
+state.backend.changelog.enabled: true
+state.backend.changelog.storage: filesystem # currently, only filesystem and 
memory (for tests) are supported
+dstl.dfs.base-path: s3://<bucket-name> # similar to state.checkpoints.dir
+```
+
+Please keep the following defaults (see [limitations](#limitations)):
+```yaml
+execution.checkpointing.max-concurrent-checkpoints: 1
+state.backend.local-recovery: false
+```
+
+Please refer to [configuration reference]({{< ref 
"docs/deployment/config#state-changelog-options" >}}) for other options.
+
+Changelog can also be enabled or disabled per-job programmatically:
+{{< tabs  >}}
+{{< tab "Java" >}}
+```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableChangelogStateBackend(true);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableChangelogStateBackend(true)
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.enable_changelog_statebackend(true)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Monitoring
+
+Available metrics are listed [here]({{< ref "docs/ops/metrics#changelog" >}}).
+
+In the UI, if a task is back-pressured by writing state changes, it will be 
shown as busy (red).
+
+### Upgrading existing jobs
+
+**Enabling Changelog**
+
+Resuming from both savepoints and checkpoints is supported:
+- given an existing non-changelog job
+- take either a [savepoint]({{< ref 
"docs/ops/state/savepoints#resuming-from-savepoints" >}}) or a [checkpoint]({{< 
ref "docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}})
+- alter configuration (enable Changelog)
+- resume from the taken snapshot
+
+**Disabling Changelog**
+Resuming only from [savepoints]({{< ref 
"docs/ops/state/savepoints#resuming-from-savepoints" >}})
+is supported. Resuming from [checkpoints]({{<  ref 
"docs/ops/state/checkpoints#resuming-from-a-retained-checkpoint" >}})
+is planned in the future versions.
+
+**State migration** (including changing TTL) is currently not supported

Review comment:
       Include these in the limitation section?

##########
File path: docs/content/docs/ops/state/state_backends.md
##########
@@ -325,6 +325,126 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs
+
+// todo: mention in [large state tuning]({{< ref 
"docs/ops/state/large_state_tuning" >}})? or 1.16?
+
+{{< hint warning >}} The feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time, and therefore 
end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by 
Asynchronous snapshots
+3. Snapshot upload time (asynchronous phase)
+
+The latter (upload time) can be decreased by [Incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}). However,
+even with Incremental checkpoints, large deployments tend to have at least one 
task in every checkpoint that uploads a
+lot of data (e.g. after compaction).
+
+With Changelog enabled, Flink uploads state changes continuously, forming a 
changelog. On checkpoint, only the relevant
+part of this changelog needs to be uploaded. Independently, configured state 
backend is checkpointed in the
+background periodically. Upon successful upload, changelog is truncated.
+
+As a result, asynchronous phase is reduced, as well as synchronous phase (in 
particular, long-tail).
+
+On the flip side, resource usage is higher:
+
+- more files are created on DFS
+- more IO bandwidth is used to upload
+- more CPU used to serialize state changes
+- more memory used by Task Managers to buffer state changes
+- todo: more details after testing, maybe link to blogpost

Review comment:
       mark here to remove todo in the final version.

##########
File path: docs/content/docs/ops/state/state_backends.md
##########
@@ -325,6 +325,126 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs
+
+// todo: mention in [large state tuning]({{< ref 
"docs/ops/state/large_state_tuning" >}})? or 1.16?
+
+{{< hint warning >}} The feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time, and therefore 
end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by 
Asynchronous snapshots

Review comment:
       The snapshot creation is still synchronous, is it a typo? I am a bit 
confused here.

##########
File path: docs/content/docs/ops/state/state_backends.md
##########
@@ -325,6 +325,126 @@ public class MyOptionsFactory implements 
ConfigurableRocksDBOptionsFactory {
 
 {{< top >}}
 
+## Enabling Changelog
+
+// todo: Chinese version of all changed docs
+
+// todo: mention in [large state tuning]({{< ref 
"docs/ops/state/large_state_tuning" >}})? or 1.16?
+
+{{< hint warning >}} The feature is in experimental status. {{< /hint >}}
+
+{{< hint warning >}} Enabling Changelog may have a negative performance impact 
on your application (see below). {{< /hint >}}
+
+### Introduction
+
+Changelog is a feature that aims to decrease checkpointing time, and therefore 
end-to-end latency in exactly-once mode.
+
+Most commonly, checkpoint duration is affected by:
+
+1. Barrier travel time and alignment, addressed by
+   [Unaligned checkpoints]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#unaligned-checkpoints" >}})
+   and [Buffer debloating]({{< ref 
"docs/ops/state/checkpointing_under_backpressure#buffer-debloating" >}})
+2. Snapshot creation time (so-called synchronous phase), addressed by 
Asynchronous snapshots
+3. Snapshot upload time (asynchronous phase)
+
+The latter (upload time) can be decreased by [Incremental checkpoints]({{< ref 
"#incremental-checkpoints" >}}). However,
+even with Incremental checkpoints, large deployments tend to have at least one 
task in every checkpoint that uploads a
+lot of data (e.g. after compaction).
+
+With Changelog enabled, Flink uploads state changes continuously, forming a 
changelog. On checkpoint, only the relevant
+part of this changelog needs to be uploaded. Independently, configured state 
backend is checkpointed in the
+background periodically. Upon successful upload, changelog is truncated.
+
+As a result, asynchronous phase is reduced, as well as synchronous phase (in 
particular, long-tail).
+
+On the flip side, resource usage is higher:
+
+- more files are created on DFS
+- more IO bandwidth is used to upload
+- more CPU used to serialize state changes
+- more memory used by Task Managers to buffer state changes
+- todo: more details after testing, maybe link to blogpost
+
+Recovery time is another thing to consider. Depending on the 
`state.backend.changelog.periodic-materialize.interval`,
+changelog can become lengthy and replaying it may take more time. However, 
recovery time combined with checkpoint
+duration will likely be still lower than in non-changelog setup, providing 
lower end-to-end latency even in failover
+case.

Review comment:
       `However, recovery time combined with checkpoint
   duration will likely be still lower than in non-changelog setup, providing 
lower end-to-end latency even in failover
   case.`
   
   I am not sure of that. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to