This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d8477bda39434da9e0a779b06d651e46deb9fefb Author: Anton Kalashnikov <[email protected]> AuthorDate: Fri Jul 30 18:13:44 2021 +0200 [FLINK-23453][core] Renaming configuration automatic-buffer-adjustment -> buffer-debloat --- .../generated/all_taskmanager_network_section.html | 22 +++++++++++----------- .../generated/task_manager_configuration.html | 22 +++++++++++----------- .../flink/configuration/TaskManagerOptions.java | 14 +++++++------- .../org/apache/flink/runtime/taskmanager/Task.java | 5 ++--- .../flink/streaming/runtime/tasks/StreamTask.java | 4 ++-- .../streaming/runtime/tasks/StreamTaskTest.java | 7 ++++--- 6 files changed, 37 insertions(+), 37 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html index 135cbfa..cc6ec6a 100644 --- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html +++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html @@ -27,24 +27,24 @@ <td>Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.</td> </tr> <tr> - <td><h5>taskmanager.network.memory.automatic-buffer-adjustment.period</h5></td> - <td style="word-wrap: break-word;">500</td> - <td>Integer</td> - <td>The minimum period of time after which the buffer size will be automatically adjusted to a new value if required. The low value provides a fast reaction to the load fluctuation but can influence the performance.</td> + <td><h5>taskmanager.network.memory.buffer-debloat.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>The switch of the automatic buffered debloating feature. If enabled the amount of in-flight data will be adjusted automatically accordingly to the measured throughput.</td> + </tr> + <tr> + <td><h5>taskmanager.network.memory.buffer-debloat.period</h5></td> + <td style="word-wrap: break-word;">500 ms</td> + <td>Duration</td> + <td>The minimum period of time after which the buffer size will be debloated if required. The low value provides a fast reaction to the load fluctuation but can influence the performance.</td> </tr> <tr> - <td><h5>taskmanager.network.memory.automatic-buffer-adjustment.samples</h5></td> + <td><h5>taskmanager.network.memory.buffer-debloat.samples</h5></td> <td style="word-wrap: break-word;">20</td> <td>Integer</td> <td>The number of the last buffer size values that will be taken for the correct calculation of the new one.</td> </tr> <tr> - <td><h5>taskmanager.network.memory.buffer-debloat.enabled</h5></td> - <td style="word-wrap: break-word;">false</td> - <td>Boolean</td> - <td>The switch of the automatic buffered debloating feature. If enabled the amount of in-flight data will be adjusted automatically accordingly to the measured throughput.</td> - </tr> - <tr> <td><h5>taskmanager.network.memory.buffer-debloat.target</h5></td> <td style="word-wrap: break-word;">1 s</td> <td>Duration</td> diff --git a/docs/layouts/shortcodes/generated/task_manager_configuration.html b/docs/layouts/shortcodes/generated/task_manager_configuration.html index b6bc41c..0bd6ce8 100644 --- a/docs/layouts/shortcodes/generated/task_manager_configuration.html +++ b/docs/layouts/shortcodes/generated/task_manager_configuration.html @@ -64,24 +64,24 @@ <ul><li>"name" - uses hostname as binding address</li><li>"ip" - uses host's ip address as binding address</li></ul></td> </tr> <tr> - <td><h5>taskmanager.network.memory.automatic-buffer-adjustment.period</h5></td> - <td style="word-wrap: break-word;">500</td> - <td>Integer</td> - <td>The minimum period of time after which the buffer size will be automatically adjusted to a new value if required. The low value provides a fast reaction to the load fluctuation but can influence the performance.</td> + <td><h5>taskmanager.network.memory.buffer-debloat.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>The switch of the automatic buffered debloating feature. If enabled the amount of in-flight data will be adjusted automatically accordingly to the measured throughput.</td> + </tr> + <tr> + <td><h5>taskmanager.network.memory.buffer-debloat.period</h5></td> + <td style="word-wrap: break-word;">500 ms</td> + <td>Duration</td> + <td>The minimum period of time after which the buffer size will be debloated if required. The low value provides a fast reaction to the load fluctuation but can influence the performance.</td> </tr> <tr> - <td><h5>taskmanager.network.memory.automatic-buffer-adjustment.samples</h5></td> + <td><h5>taskmanager.network.memory.buffer-debloat.samples</h5></td> <td style="word-wrap: break-word;">20</td> <td>Integer</td> <td>The number of the last buffer size values that will be taken for the correct calculation of the new one.</td> </tr> <tr> - <td><h5>taskmanager.network.memory.buffer-debloat.enabled</h5></td> - <td style="word-wrap: break-word;">false</td> - <td>Boolean</td> - <td>The switch of the automatic buffered debloating feature. If enabled the amount of in-flight data will be adjusted automatically accordingly to the measured throughput.</td> - </tr> - <tr> <td><h5>taskmanager.network.memory.buffer-debloat.target</h5></td> <td style="word-wrap: break-word;">1 s</td> <td>Duration</td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index b3a0bcd..dacc973 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -524,18 +524,18 @@ public class TaskManagerOptions { /** The period between recalculation the relevant size of the buffer. */ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) - public static final ConfigOption<Integer> AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD = - ConfigOptions.key("taskmanager.network.memory.automatic-buffer-adjustment.period") - .intType() - .defaultValue(500) + public static final ConfigOption<Duration> BUFFER_DEBLOAT_PERIOD = + ConfigOptions.key("taskmanager.network.memory.buffer-debloat.period") + .durationType() + .defaultValue(Duration.ofMillis(500)) .withDescription( - "The minimum period of time after which the buffer size will be automatically adjusted to a new value if required. " + "The minimum period of time after which the buffer size will be debloated if required. " + "The low value provides a fast reaction to the load fluctuation but can influence the performance."); /** The number of samples requires for the buffer size adjustment. */ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) - public static final ConfigOption<Integer> AUTOMATIC_BUFFER_ADJUSTMENT_SAMPLES = - ConfigOptions.key("taskmanager.network.memory.automatic-buffer-adjustment.samples") + public static final ConfigOption<Integer> BUFFER_DEBLOAT_SAMPLES = + ConfigOptions.key("taskmanager.network.memory.buffer-debloat.samples") .intType() .defaultValue(20) .withDescription( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index f2b1ba9e..f50bb44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -113,7 +113,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; -import static org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_SAMPLES; +import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_SAMPLES; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -420,8 +420,7 @@ public class Task this.inputGates = new IndexedInputGate[gates.length]; this.throughputCalculator = new ThroughputCalculator( - SystemClock.getInstance(), - taskConfiguration.get(AUTOMATIC_BUFFER_ADJUSTMENT_SAMPLES)); + SystemClock.getInstance(), taskConfiguration.get(BUFFER_DEBLOAT_SAMPLES)); int counter = 0; for (IndexedInputGate gate : gates) { inputGates[counter++] = diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index ebdfbdb..851183b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -119,7 +119,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; -import static org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD; +import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_PERIOD; import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; import static org.apache.flink.util.ExceptionUtils.rethrowException; import static org.apache.flink.util.Preconditions.checkState; @@ -395,7 +395,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true); this.throughputCalculator = environment.getThroughputMeter(); - this.bufferDebloatPeriod = getTaskConfiguration().get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD); + this.bufferDebloatPeriod = getTaskConfiguration().get(BUFFER_DEBLOAT_PERIOD).toMillis(); this.bufferDebloater = getTaskConfiguration().get(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 8f8220c..f721354 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -171,8 +171,8 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND; -import static org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD; import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED; +import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_PERIOD; import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton; @@ -1819,7 +1819,8 @@ public class StreamTaskTest extends TestLogger { try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder() .setTaskConfiguration( - new Configuration().set(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD, 1)) + new Configuration() + .set(BUFFER_DEBLOAT_PERIOD, Duration.ofMillis(1))) .setThroughputMeter( new ThroughputCalculator(SystemClock.getInstance(), 10) { @Override @@ -1888,7 +1889,7 @@ public class StreamTaskTest extends TestLogger { int inputChannels = 3; Consumer<StreamConfig> configuration = (config) -> { - config.getConfiguration().set(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD, 10); + config.getConfiguration().set(BUFFER_DEBLOAT_PERIOD, Duration.ofMillis(10)); config.getConfiguration().set(BUFFER_DEBLOAT_TARGET, Duration.ofSeconds(1)); config.getConfiguration().set(BUFFER_DEBLOAT_ENABLED, true); };
