pnowojski commented on a change in pull request #16628:
URL: https://github.com/apache/flink/pull/16628#discussion_r679138892



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -307,6 +307,12 @@ private void notifyCreditAvailable() throws IOException {
         partitionRequestClient.notifyCreditAvailable(this);
     }
 
+    private void notifyNewBufferSize() throws IOException {
+        checkPartitionRequestQueueInitialized();
+
+        partitionRequestClient.notifyNewBufferSize(this);

Review comment:
       Because you are passing `this`, it enforces you to add a synchronisation 
point, complicate threading contract and makes you use `AtomicLong` later as 
`currentBufferSize`. 
   
   Can we pass the `currentBufferSize` directly here? 

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -531,6 +541,38 @@
                     .withDescription(
                             "The number of the last buffer size values that 
will be taken for the correct calculation of the new one.");
 
+    /** The total time for which automated adjusted buffers should be fully 
consumed. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME =

Review comment:
       Duration (ditto for fixing the already merged code to the master)

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -531,6 +541,38 @@
                     .withDescription(
                             "The number of the last buffer size values that 
will be taken for the correct calculation of the new one.");
 
+    /** The total time for which automated adjusted buffers should be fully 
consumed. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME =
+            ConfigOptions.key(
+                            
"taskmanager.network.memory.automatic-buffer-adjustment.consumption-time")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The total time for which automated adjusted 
buffers should be fully consumed. "
+                                    + "This means that the in-flight data 
between two subtask should be fully consumed for approximately this time.");

Review comment:
       > The target total time after which buffered in-flight data should be 
fully consumed. This configuration option will be used, in combination with the 
measured throughput, to adjust the amount of in-flight data.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -531,6 +541,38 @@
                     .withDescription(
                             "The number of the last buffer size values that 
will be taken for the correct calculation of the new one.");
 
+    /** The total time for which automated adjusted buffers should be fully 
consumed. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME =
+            ConfigOptions.key(
+                            
"taskmanager.network.memory.automatic-buffer-adjustment.consumption-time")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The total time for which automated adjusted 
buffers should be fully consumed. "
+                                    + "This means that the in-flight data 
between two subtask should be fully consumed for approximately this time.");
+
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Boolean> 
AUTOMATIC_BUFFER_ADJUSTMENT_ENABLED =
+            
ConfigOptions.key("taskmanager.network.memory.automatic-buffer-adjustment.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "The switch of the feature of the automatic buffer 
adjustment. "
+                                    + "If enabled the size of the buffer will 
be adjusted automatically depends on the current throughput.");

Review comment:
       > The switch of the automatic buffered debloating feature.
   > If enabled the amount of in-flight data will be adjusted automatically 
accordingly to the measured throughput.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -531,6 +541,38 @@
                     .withDescription(
                             "The number of the last buffer size values that 
will be taken for the correct calculation of the new one.");
 
+    /** The total time for which automated adjusted buffers should be fully 
consumed. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME =
+            ConfigOptions.key(
+                            
"taskmanager.network.memory.automatic-buffer-adjustment.consumption-time")

Review comment:
       Somehow `consumption-time` doesn't sound right to me. Maybe something as 
simple as:
   ```
   taskmanager.network.memory.buffer-debloat.target
   ```
   ?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/autobuffersize/BufferSizeCalculator.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.autobuffersize;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.throughput.ThroughputCalculator;
+import org.apache.flink.streaming.runtime.tasks.TimerService;
+
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME;
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD;
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_THRESHOLD_PERCENTAGES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Class for automatic calculation of the buffer size based on the current 
throughput and
+ * configuration.
+ */
+public class BufferSizeCalculator {

Review comment:
       `BufferDebloater`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
##########
@@ -64,4 +64,8 @@ public void blockConsumption(InputChannelInfo channelInfo) {
     public void convertToPriorityEvent(int channelIndex, int sequenceNumber) 
throws IOException {
         getChannel(channelIndex).convertToPriorityEvent(sequenceNumber);
     }
+
+    public abstract int getBuffersInUseCount();

Review comment:
       I can see that. Let me think about it tomorrow.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/autobuffersize/BufferSizeCalculator.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.autobuffersize;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.throughput.ThroughputCalculator;
+import org.apache.flink.streaming.runtime.tasks.TimerService;
+
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME;
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD;
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_THRESHOLD_PERCENTAGES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Class for automatic calculation of the buffer size based on the current 
throughput and
+ * configuration.
+ */
+public class BufferSizeCalculator {
+    private static final double MILLIS_IN_SECOND = 1000.0;
+    private final ThroughputCalculator throughputCalculator;
+    private final int bufferAdjustmentPeriod;
+
+    /**
+     * How different should be the total buffer size compare to throughput 
(when it is 1.0 then
+     * bufferSize == throughput).
+     */
+    private final double throughputFactor;
+
+    private final IndexedInputGate[] inputGates;
+    private final long maxBufferSize;
+    private final long minBufferSize;
+    private final int bufferAdjustmentThresholdPercentages;
+    private final boolean isBufferSizeRecalculationEnabled;
+
+    private long lastBufferSize;
+
+    public BufferSizeCalculator(
+            ThroughputCalculator throughputCalculator,
+            Configuration taskConfig,
+            IndexedInputGate[] inputGates) {
+        this.throughputCalculator = throughputCalculator;
+        this.inputGates = inputGates;
+        this.bufferAdjustmentPeriod = 
taskConfig.get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD);
+        this.throughputFactor =
+                taskConfig.get(AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME) / 
MILLIS_IN_SECOND;
+        this.maxBufferSize = 
taskConfig.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE).getBytes();
+        this.minBufferSize = 
taskConfig.get(TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE).getBytes();
+        this.isBufferSizeRecalculationEnabled =
+                
taskConfig.get(TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_ENABLED);
+
+        this.bufferAdjustmentThresholdPercentages =
+                
taskConfig.getInteger(AUTOMATIC_BUFFER_ADJUSTMENT_THRESHOLD_PERCENTAGES);
+
+        this.lastBufferSize = maxBufferSize;
+
+        checkArgument(maxBufferSize > 0);
+        checkArgument(minBufferSize > 0);
+        checkArgument(maxBufferSize >= minBufferSize);
+        checkArgument(throughputFactor > 0.0);
+    }
+
+    public void init(TimerService systemTimerService, MailboxExecutor 
mailboxExecutor) {
+        scheduleNextRecalculation(systemTimerService, mailboxExecutor);
+    }
+
+    private void scheduleNextRecalculation(
+            TimerService systemTimerService, MailboxExecutor mailboxExecutor) {
+        systemTimerService.registerTimer(
+                systemTimerService.getCurrentProcessingTime() + 
bufferAdjustmentPeriod,
+                timestamp ->
+                        mailboxExecutor.submit(
+                                () -> {
+                                    recalculateBufferSize();
+                                    
scheduleNextRecalculation(systemTimerService, mailboxExecutor);
+                                },
+                                "Buffer size recalculation"));
+    }
+
+    @VisibleForTesting
+    void recalculateBufferSize() {
+        long totalBuffersSize =

Review comment:
       maybe `desiredTotalBufferSizeInBytes`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/autobuffersize/BufferSizeCalculator.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.autobuffersize;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.throughput.ThroughputCalculator;
+import org.apache.flink.streaming.runtime.tasks.TimerService;
+
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME;
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD;
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_THRESHOLD_PERCENTAGES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Class for automatic calculation of the buffer size based on the current 
throughput and
+ * configuration.
+ */
+public class BufferSizeCalculator {
+    private static final double MILLIS_IN_SECOND = 1000.0;
+    private final ThroughputCalculator throughputCalculator;
+    private final int bufferAdjustmentPeriod;
+
+    /**
+     * How different should be the total buffer size compare to throughput 
(when it is 1.0 then
+     * bufferSize == throughput).
+     */
+    private final double throughputFactor;
+
+    private final IndexedInputGate[] inputGates;
+    private final long maxBufferSize;
+    private final long minBufferSize;
+    private final int bufferAdjustmentThresholdPercentages;
+    private final boolean isBufferSizeRecalculationEnabled;
+
+    private long lastBufferSize;
+
+    public BufferSizeCalculator(
+            ThroughputCalculator throughputCalculator,
+            Configuration taskConfig,
+            IndexedInputGate[] inputGates) {
+        this.throughputCalculator = throughputCalculator;
+        this.inputGates = inputGates;
+        this.bufferAdjustmentPeriod = 
taskConfig.get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD);
+        this.throughputFactor =

Review comment:
       rename to `targetBufferSizeInMillis`, 
`configuredTotalBufferSizeInMillis`?

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -531,6 +541,38 @@
                     .withDescription(
                             "The number of the last buffer size values that 
will be taken for the correct calculation of the new one.");
 
+    /** The total time for which automated adjusted buffers should be fully 
consumed. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME =
+            ConfigOptions.key(
+                            
"taskmanager.network.memory.automatic-buffer-adjustment.consumption-time")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The total time for which automated adjusted 
buffers should be fully consumed. "
+                                    + "This means that the in-flight data 
between two subtask should be fully consumed for approximately this time.");
+
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Boolean> 
AUTOMATIC_BUFFER_ADJUSTMENT_ENABLED =
+            
ConfigOptions.key("taskmanager.network.memory.automatic-buffer-adjustment.enabled")
+                    .booleanType()
+                    .defaultValue(false)

Review comment:
       we should enable it by default or at least randomise in the tests

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -531,6 +541,38 @@
                     .withDescription(
                             "The number of the last buffer size values that 
will be taken for the correct calculation of the new one.");
 
+    /** The total time for which automated adjusted buffers should be fully 
consumed. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME =
+            ConfigOptions.key(
+                            
"taskmanager.network.memory.automatic-buffer-adjustment.consumption-time")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The total time for which automated adjusted 
buffers should be fully consumed. "
+                                    + "This means that the in-flight data 
between two subtask should be fully consumed for approximately this time.");
+
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Boolean> 
AUTOMATIC_BUFFER_ADJUSTMENT_ENABLED =
+            
ConfigOptions.key("taskmanager.network.memory.automatic-buffer-adjustment.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "The switch of the feature of the automatic buffer 
adjustment. "
+                                    + "If enabled the size of the buffer will 
be adjusted automatically depends on the current throughput.");
+
+    /**
+     * Difference between the new and the old buffer size for applying the new 
value(in percent).
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
AUTOMATIC_BUFFER_ADJUSTMENT_THRESHOLD_PERCENTAGES =
+            ConfigOptions.key(
+                            
"taskmanager.network.memory.automatic-buffer-adjustment.threshold-percentages")
+                    .intType()
+                    .defaultValue(10)

Review comment:
       `25%`? `50%`? `10%` sounds to small for me. Whether we buffer 1s or 1.1s 
it really doesn't matter.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -531,6 +541,38 @@
                     .withDescription(
                             "The number of the last buffer size values that 
will be taken for the correct calculation of the new one.");
 
+    /** The total time for which automated adjusted buffers should be fully 
consumed. */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME =
+            ConfigOptions.key(
+                            
"taskmanager.network.memory.automatic-buffer-adjustment.consumption-time")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The total time for which automated adjusted 
buffers should be fully consumed. "
+                                    + "This means that the in-flight data 
between two subtask should be fully consumed for approximately this time.");
+
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Boolean> 
AUTOMATIC_BUFFER_ADJUSTMENT_ENABLED =
+            
ConfigOptions.key("taskmanager.network.memory.automatic-buffer-adjustment.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "The switch of the feature of the automatic buffer 
adjustment. "
+                                    + "If enabled the size of the buffer will 
be adjusted automatically depends on the current throughput.");
+
+    /**
+     * Difference between the new and the old buffer size for applying the new 
value(in percent).
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
AUTOMATIC_BUFFER_ADJUSTMENT_THRESHOLD_PERCENTAGES =
+            ConfigOptions.key(
+                            
"taskmanager.network.memory.automatic-buffer-adjustment.threshold-percentages")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "The minimum percentages which should be the 
difference between the newly calculated buffer size and the old one for 
applying the new value.");

Review comment:
       > The minimum difference in percentage between the newly calculated 
buffer size and the old one to announce the new value. Can be used to avoid 
constant back and forth small adjustments.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/autobuffersize/BufferSizeCalculator.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.autobuffersize;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.throughput.ThroughputCalculator;
+import org.apache.flink.streaming.runtime.tasks.TimerService;
+
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME;
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD;
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_THRESHOLD_PERCENTAGES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Class for automatic calculation of the buffer size based on the current 
throughput and
+ * configuration.
+ */
+public class BufferSizeCalculator {
+    private static final double MILLIS_IN_SECOND = 1000.0;
+    private final ThroughputCalculator throughputCalculator;
+    private final int bufferAdjustmentPeriod;
+
+    /**
+     * How different should be the total buffer size compare to throughput 
(when it is 1.0 then
+     * bufferSize == throughput).
+     */
+    private final double throughputFactor;
+
+    private final IndexedInputGate[] inputGates;
+    private final long maxBufferSize;
+    private final long minBufferSize;
+    private final int bufferAdjustmentThresholdPercentages;
+    private final boolean isBufferSizeRecalculationEnabled;
+
+    private long lastBufferSize;
+
+    public BufferSizeCalculator(
+            ThroughputCalculator throughputCalculator,
+            Configuration taskConfig,
+            IndexedInputGate[] inputGates) {
+        this.throughputCalculator = throughputCalculator;
+        this.inputGates = inputGates;
+        this.bufferAdjustmentPeriod = 
taskConfig.get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD);
+        this.throughputFactor =
+                taskConfig.get(AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME) / 
MILLIS_IN_SECOND;
+        this.maxBufferSize = 
taskConfig.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE).getBytes();
+        this.minBufferSize = 
taskConfig.get(TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE).getBytes();
+        this.isBufferSizeRecalculationEnabled =
+                
taskConfig.get(TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_ENABLED);

Review comment:
       I think this should be handled from the outside?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/autobuffersize/BufferSizeCalculator.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.autobuffersize;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.throughput.ThroughputCalculator;
+import org.apache.flink.streaming.runtime.tasks.TimerService;
+
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME;
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD;
+import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_THRESHOLD_PERCENTAGES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Class for automatic calculation of the buffer size based on the current 
throughput and
+ * configuration.
+ */
+public class BufferSizeCalculator {
+    private static final double MILLIS_IN_SECOND = 1000.0;
+    private final ThroughputCalculator throughputCalculator;
+    private final int bufferAdjustmentPeriod;
+
+    /**
+     * How different should be the total buffer size compare to throughput 
(when it is 1.0 then
+     * bufferSize == throughput).
+     */
+    private final double throughputFactor;
+
+    private final IndexedInputGate[] inputGates;
+    private final long maxBufferSize;
+    private final long minBufferSize;
+    private final int bufferAdjustmentThresholdPercentages;
+    private final boolean isBufferSizeRecalculationEnabled;
+
+    private long lastBufferSize;
+
+    public BufferSizeCalculator(
+            ThroughputCalculator throughputCalculator,
+            Configuration taskConfig,
+            IndexedInputGate[] inputGates) {
+        this.throughputCalculator = throughputCalculator;
+        this.inputGates = inputGates;
+        this.bufferAdjustmentPeriod = 
taskConfig.get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD);
+        this.throughputFactor =
+                taskConfig.get(AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME) / 
MILLIS_IN_SECOND;

Review comment:
       `Duration#toMillis()`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
##########
@@ -266,6 +266,13 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
         return 0;
     }
 
+    @Override
+    public int getNumberOfQueuedBuffers() {
+        synchronized (lock) {
+            return getBuffersInBacklogUnsafe();

Review comment:
       are you sure this is correct? It looks to me like 
`getBuffersInBacklogUnsafe()` counter is ever increasing one?
   
   Besides, even if it had indeed thousands of buffers in backlog spilled on 
disks, I don't think we should be using that number for debloating... Maybe we 
should cap this returned number with 
`taskmanager.network.memory.max-buffers-per-channel`? And also cup the sum the 
number of total number of buffers in use/backlog with `#exclusiveBuffers * 
#channels + #floatingBuffers`?
   
   Furthermore, debloating with `BoundedBlockingSubpartition` probably doesn't 
matter at the moment, as those partitions do not support checkpointing and are 
used only in batch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to