pnowojski commented on a change in pull request #16628:
URL: https://github.com/apache/flink/pull/16628#discussion_r680156845
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -543,35 +543,34 @@
/** The total time for which automated adjusted buffers should be fully
consumed. */
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
- public static final ConfigOption<Integer>
AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME =
- ConfigOptions.key(
-
"taskmanager.network.memory.automatic-buffer-adjustment.consumption-time")
- .intType()
- .defaultValue(1000)
+ public static final ConfigOption<Duration> BUFFER_DEBLOAT_TARGET =
+
ConfigOptions.key("taskmanager.network.memory.buffer-debloat.target")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
.withDescription(
- "The total time for which automated adjusted
buffers should be fully consumed. "
- + "This means that the in-flight data
between two subtask should be fully consumed for approximately this time.");
+ "The target total time after which buffered
in-flight data should be fully consumed. "
+ + "This configuration option will be used,
in combination with the measured throughput, to adjust the amount of in-flight
data.");
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
- public static final ConfigOption<Boolean>
AUTOMATIC_BUFFER_ADJUSTMENT_ENABLED =
-
ConfigOptions.key("taskmanager.network.memory.automatic-buffer-adjustment.enabled")
+ public static final ConfigOption<Boolean> BUFFER_DEBLOAT_ENABLED =
+
ConfigOptions.key("taskmanager.network.memory.buffer-debloat.enabled")
.booleanType()
- .defaultValue(false)
+ .defaultValue(true)
Review comment:
Ok, wait a second 😅 I meant in production code it should be disabled
until we are certain it's good enough to be used by default. I doubt we can get
that confidence during this release cycle. I meant to enable it by default in
tests, or maybe randomise it similar way how unaligned checkpoints are randomly
turned on/off in ITCases.
I think relevant code for randomisation is in
`TestStreamEnvironment#setAsContext()`, but you could ask Arvid or @curcur to
make sure that's the right place to hook in.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/autobuffersize/BufferSizeCalculator.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.autobuffersize;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.throughput.ThroughputCalculator;
+import org.apache.flink.streaming.runtime.tasks.TimerService;
+
+import static
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME;
+import static
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD;
+import static
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_THRESHOLD_PERCENTAGES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Class for automatic calculation of the buffer size based on the current
throughput and
+ * configuration.
+ */
+public class BufferSizeCalculator {
+ private static final double MILLIS_IN_SECOND = 1000.0;
+ private final ThroughputCalculator throughputCalculator;
+ private final int bufferAdjustmentPeriod;
+
+ /**
+ * How different should be the total buffer size compare to throughput
(when it is 1.0 then
+ * bufferSize == throughput).
+ */
+ private final double throughputFactor;
+
+ private final IndexedInputGate[] inputGates;
+ private final long maxBufferSize;
+ private final long minBufferSize;
+ private final int bufferAdjustmentThresholdPercentages;
+ private final boolean isBufferSizeRecalculationEnabled;
+
+ private long lastBufferSize;
+
+ public BufferSizeCalculator(
+ ThroughputCalculator throughputCalculator,
+ Configuration taskConfig,
+ IndexedInputGate[] inputGates) {
+ this.throughputCalculator = throughputCalculator;
+ this.inputGates = inputGates;
+ this.bufferAdjustmentPeriod =
taskConfig.get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD);
+ this.throughputFactor =
+ taskConfig.get(AUTOMATIC_BUFFER_ADJUSTMENT_CONSUMPTION_TIME) /
MILLIS_IN_SECOND;
+ this.maxBufferSize =
taskConfig.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE).getBytes();
+ this.minBufferSize =
taskConfig.get(TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE).getBytes();
+ this.isBufferSizeRecalculationEnabled =
+
taskConfig.get(TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_ENABLED);
Review comment:
I mean in that case I would expect throughput to be calculated/updated
also from the outside, and this class using it only as a getter, without
causing some side effects.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
##########
@@ -268,9 +268,7 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
@Override
public int getNumberOfQueuedBuffers() {
- synchronized (lock) {
- return getBuffersInBacklogUnsafe();
- }
Review comment:
Shouldn't this change affect some unit test? Does it mean it doesn't
have a test coverage?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]