akalash commented on a change in pull request #16628:
URL: https://github.com/apache/flink/pull/16628#discussion_r680918747



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -543,35 +543,34 @@
 
     /** The total time for which automated adjusted buffers should be fully 
consumed. */
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
-    public static final ConfigOption<Integer> 
AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME =
-            ConfigOptions.key(
-                            
"taskmanager.network.memory.automatic-buffer-adjustment.consumption-time")
-                    .intType()
-                    .defaultValue(1000)
+    public static final ConfigOption<Duration> BUFFER_DEBLOAT_TARGET =
+            
ConfigOptions.key("taskmanager.network.memory.buffer-debloat.target")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
                     .withDescription(
-                            "The total time for which automated adjusted 
buffers should be fully consumed. "
-                                    + "This means that the in-flight data 
between two subtask should be fully consumed for approximately this time.");
+                            "The target total time after which buffered 
in-flight data should be fully consumed. "
+                                    + "This configuration option will be used, 
in combination with the measured throughput, to adjust the amount of in-flight 
data.");
 
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
-    public static final ConfigOption<Boolean> 
AUTOMATIC_BUFFER_ADJUSTMENT_ENABLED =
-            
ConfigOptions.key("taskmanager.network.memory.automatic-buffer-adjustment.enabled")
+    public static final ConfigOption<Boolean> BUFFER_DEBLOAT_ENABLED =
+            
ConfigOptions.key("taskmanager.network.memory.buffer-debloat.enabled")
                     .booleanType()
-                    .defaultValue(false)
+                    .defaultValue(true)

Review comment:
       Ok, I turned it back to false and also I added the randomization in 
tests(see my last commit) but I don't sure where we should turn it on( the 
randomization for the checkpoint was disabled by default and I did the same).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
##########
@@ -268,9 +268,7 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
 
     @Override
     public int getNumberOfQueuedBuffers() {
-        synchronized (lock) {
-            return getBuffersInBacklogUnsafe();
-        }

Review comment:
       Specifically, BoundedBlockingSubpartition didn't have such a test that 
checks the correct number of buffers in the backlog. But I added such tests for 
PipelinedSubpartition and RemotInputChannel




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to