pnowojski commented on a change in pull request #16988:
URL: https://github.com/apache/flink/pull/16988#discussion_r700061022



##########
File path: docs/content/docs/deployment/memory/network_mem_tuning.md
##########
@@ -0,0 +1,160 @@
+---
+title: "Network Buffer Tuning"
+weight: 100
+type: docs
+aliases:
+  - /deployment/memory/network_mem_tuning.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Network memory tuning guide
+
+## Overview
+
+Each record in flink is sent to the next subtask not individually but 
compounded in Network buffer,
+the smallest unit for communication between subtasks. Also, in order to keep 
consistent high throughput,
+Flink uses the network buffer queues (so called in-flight data) both on the 
output as well as on the input side. 
+In the result each subtask have an input queue waiting for the consumption and 
an output queue
+waiting for sending to the next subtask. Having a larger amount of the 
in-flight data means Flink can provide a
+higher throughput that's more resilient to small hiccups in the pipeline but 
it has negative effect for the
+checkpoint time. 
+
+The long checkpoint time issue can be caused by many things, one of those is 
checkpoint barriers
+propagation time. Checkpoint in Flink can finish only once all subtask 
receives all injected checkpoint
+barriers. In [aligned checkpoints]({{< ref 
"docs/concepts/stateful-stream-processing" >}}#checkpointing)
+those checkpoint barriers are traveling throughout the job graph along
+the network buffers and the larger amount of in-flight data the longer the 
checkpoint barrier propagation
+time. In [unaligned checkpoints]({{< ref 
"docs/concepts/stateful-stream-processing" >}}#unaligned-checkpointing) on the 
other hand, the more in-flight data, the larger the checkpoint size as
+all of the captured in-flight data has to be persisted as part of the 
checkpoint.
+
+## Buffer debloat
+
+Historically the only way to configure the amount of in-flight data was to 
specify both amount and the size
+of the buffers. However ideal values for those numbers are hard to pick, as 
they are different for every
+deployment. The buffer debloating mechanism added in Flink 1.14 attempts to 
address this issue.
+It tries to automatically adjust the amount of in-flight data to reasonable 
values.
+More precisely, the buffer debloating calculate the maximum possible throughput
+(the maximum throughput which would be if the subtask was always busy)
+for the subtask and adjusts the amount of in-flight data in such a way that 
the time for consumption of those in-flight data will be equal to the 
configured value.
+
+The most important settings:
+* The buffer debloat can be enabled by setting the property 
`taskmanager.network.memory.buffer-debloat.enabled` to `true`. 
+* The targeted time to consume the in-flight data can be configured by setting 
`taskmanager.network.memory.buffer-debloat.target` to `duration`.
+  The default value of the debloat target should be good enough in most cases.
+
+Buffer debloating in Flink works by measuring past throguhput to predict 
future time to consume the remaining
+in-flight data. If those predictions are incorrect, the debloating mechanism 
can fail in one of the two ways:
+* There won't be enough buffered data to provide full throughput
+* There will be too many buffered in-flight data and the aligned checkpoint 
barriers propagation time or the unaligned checkpoint size will suffer.
+
+Hence, if you have a varying load in your job, for example a sudden spikes of 
incoming records, or periodically
+firing windowed aggregations or joins, you might need to adjust the following 
settings:
+
+* `taskmanager.network.memory.buffer-debloat.period` - The minimum time 
between buffer size recalculation.
+The shorter the period, the faster reaction time of the debloating mechanism, 
but a higher CPU overhead for the necessary calculations.
+* `taskmanager.network.memory.buffer-debloat.samples` - Adjust the number of 
samples over which throughput measurements are averaged out.
+The frequency of the collected samples can be adjusted via 
`taskmanager.network.memory.buffer-debloat.period`.
+The fewer samples, the faster reaction time of the debloating mechanism, but a 
higher chance of a sudden spike or drop of the throughput to cause the buffer 
debloating to miscalculate the best amount of the in-flight data.
+* `taskmanager.network.memory.buffer-debloat.threshold-percentages` - The 
optimization which prevents 
+the frequent buffer size change if the new size is not so different compared 
to the old one.
+
+See the [Configuration]({{< ref "docs/deployment/config" 
>}}#full-taskmanageroptions) documentation for details and additional 
parameters.
+
+The [metrics]({{< ref "docs/ops/metrics" >}}#io) which can help to observe the 
current buffer size:
+* `estimatedTimeToConsumerBuffersMs` - the total time to consume data from all 
input channels.
+* `debloatedBufferSize` - the current buffer size.
+

Review comment:
       Can you add the limitations section to the buffer debloating?
   
   ```
   ### Limitations
   
   Currently there are a couple of cases that are not handled automatically by 
the buffer debloating feature.
   
   #### Large records
   
   If your record size exceeds 
`org.apache.flink.configuration.TaskManagerOptions#MIN_MEMORY_SEGMENT_SIZE`, 
buffer debloating
   can shrink the buffer size so much, that network stack will require more 
than one buffer to transfer a 
   single record. This can have adverse effects on the throughput, without 
actually reducing the amount of the in-flight data. However this might be not 
an issue for you.
   
   #### Multiple inputs and unions
   
   Currently the throughput calculation and buffer debloating happens on the 
subtask level. If your subtask has
   multiple different inputs, or it has single but unioned input, this can 
cause some problems. It might be especially
   visible if those different inputs have vastly different throughputs. In such 
case as the result of buffer debloating
   low throughput input might have still too much buffered in-flight data, 
while the hight throughput input might
   have too small buffers to sustain that throughput. We recommend paying 
special attention to such subtasks
   when testing this feature.
   
   #### Buffer size and number of buffers
   
   Currently buffer debloating only caps the maximal used buffer size. The 
actual buffer size and the number of
   buffers remains unchanged. One consequence of this is that the debloating 
can not reduce memory usage of
   your job. To achieve that, you have to manually reduce the either amount or 
the size of the buffers. Secondly
   if you want to further reduce the amount of buffered in-flight data below 
what buffer debloating allows currently,
   you still might want to manually reduce the configured number of buffers.
   ```
   + cross reference the relevant config options/docs sections?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to