This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9158fc76263b238eeb4a0424a843cb8a59269557 Author: Anton Kalashnikov <[email protected]> AuthorDate: Wed Jul 28 18:25:41 2021 +0200 [FLINK-23453][streaming] Created the buffer debloater for the ability to automatically change the buffer size based on the throughput. --- .../runtime/throughput/ThroughputCalculator.java | 5 +- .../tasks/bufferdebloat/BufferDebloater.java | 99 +++++++++++++ .../tasks/bufferdebloat/BufferDebloaterTest.java | 163 +++++++++++++++++++++ 3 files changed, 264 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java index 83aa408..e6911f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java @@ -29,7 +29,6 @@ public class ThroughputCalculator { private long currentAccumulatedDataSize; private long currentMeasurementTime; private long measurementStartTime = NOT_TRACKED; - private long lastThroughput; public ThroughputCalculator(Clock clock, int numberOfSamples) { this.clock = clock; @@ -62,13 +61,13 @@ public class ThroughputCalculator { currentMeasurementTime += clock.relativeTimeMillis() - measurementStartTime; } - lastThroughput = + long throughput = throughputEMA.calculateThroughput( currentAccumulatedDataSize, currentMeasurementTime); measurementStartTime = clock.relativeTimeMillis(); currentAccumulatedDataSize = currentMeasurementTime = 0; - return lastThroughput; + return throughput; } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java new file mode 100644 index 0000000..8b2b175 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.bufferdebloat; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; + +import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET; +import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Class for automatic calculation of the buffer size based on the current throughput and + * configuration. + */ +public class BufferDebloater { + private static final double MILLIS_IN_SECOND = 1000.0; + + /** + * How different should be the total buffer size compare to throughput (when it is 1.0 then + * bufferSize == throughput). + */ + private final double targetBufferSizeFactor; + + private final IndexedInputGate[] inputGates; + private final long maxBufferSize; + private final long minBufferSize; + private final int bufferDebloatThresholdPercentages; + + private int lastBufferSize; + + public BufferDebloater(Configuration taskConfig, IndexedInputGate[] inputGates) { + this.inputGates = inputGates; + this.targetBufferSizeFactor = + taskConfig.get(BUFFER_DEBLOAT_TARGET).toMillis() / MILLIS_IN_SECOND; + this.maxBufferSize = taskConfig.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE).getBytes(); + this.minBufferSize = taskConfig.get(TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE).getBytes(); + + this.bufferDebloatThresholdPercentages = + taskConfig.getInteger(BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES); + + this.lastBufferSize = (int) maxBufferSize; + + // Right now the buffer size can not be grater than integer max value according to + // MemorySegment and buffer implementation. + checkArgument(maxBufferSize <= Integer.MAX_VALUE); + checkArgument(maxBufferSize > 0); + checkArgument(minBufferSize > 0); + checkArgument(maxBufferSize >= minBufferSize); + checkArgument(targetBufferSizeFactor > 0.0); + } + + public void recalculateBufferSize(long currentThroughput) { + long desiredTotalBufferSizeInBytes = (long) (currentThroughput * targetBufferSizeFactor); + + int totalNumber = 0; + for (IndexedInputGate inputGate : inputGates) { + totalNumber += Math.max(1, inputGate.getBuffersInUseCount()); + } + int newSize = + (int) + Math.max( + minBufferSize, + Math.min( + desiredTotalBufferSizeInBytes / totalNumber, + maxBufferSize)); + + boolean skipUpdate = + Math.abs(1 - ((double) lastBufferSize) / newSize) * 100 + < bufferDebloatThresholdPercentages; + + // Skip update if the new value pretty close to the old one. + if (skipUpdate) { + return; + } + + lastBufferSize = newSize; + for (IndexedInputGate inputGate : inputGates) { + inputGate.announceBufferSize(newSize); + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java new file mode 100644 index 0000000..853f6c2 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.bufferdebloat; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.streaming.runtime.io.MockInputGate; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; + +import static java.util.Arrays.asList; +import static org.apache.flink.configuration.MemorySize.MemoryUnit.BYTES; +import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED; +import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET; +import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_SEGMENT_SIZE; +import static org.apache.flink.configuration.TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** Test for {@link BufferDebloater}. */ +public class BufferDebloaterTest extends TestLogger { + + @Test + public void testZeroBuffersInUse() { + // if the gate returns the zero buffers in use it should be transformed to 1. + testBufferSizeCalculation(3, asList(0, 1, 0), 3333, 50, 2400, 1000, 1111); + } + + @Test + public void testCorrectBufferSizeCalculation() { + testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 1100, 1200, 249); + } + + @Test + public void testCalculatedBufferSizeLessThanMin() { + testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 250, 1100, 1200, 250); + } + + @Test + public void testCalculatedBufferSizeForThroughputZero() { + // When the throughput is zero then min buffer size will be taken. + testBufferSizeCalculation(3, asList(3, 5, 8), 0, 50, 1100, 1200, 50); + } + + @Test + public void testConfiguredConsumptionTimeIsTooLow() { + // When the consumption time is low then min buffer size will be taken. + testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 1100, 7, 50); + } + + @Test + public void testCalculatedBufferSizeGreaterThanMax() { + // New calculated buffer size should be more than max value it means that we should take max + // value which means that no updates should happen(-1 means that we take the initial value) + // because the old value equal to new value. + testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 248, 1200, -1); + } + + @Test + public void testCalculatedBufferSlightlyDifferentFromCurrentOne() { + // New calculated buffer size should be a little less than current value(or max value which + // is the same) it means that no updates should happen(-1 means that we take the initial + // value) because the new value is not so different from the old one. + testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 250, 1200, -1); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeMinBufferSize() { + testBufferSizeCalculation(3, asList(3, 5, 8), 3333, -1, 248, 1200, 248); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeMaxBufferSize() { + testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, -1, 1200, 248); + } + + @Test(expected = IllegalArgumentException.class) + public void testMinGreaterThanMaxBufferSize() { + testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 49, 1200, 248); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeConsumptionTime() { + testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 1100, -1, 248); + } + + private void testBufferSizeCalculation( + int numberOfGates, + List<Integer> numberOfBuffersInUse, + long throughput, + long minBufferSize, + long maxBufferSize, + int consumptionTime, + long expectedBufferSize) { + TestBufferSizeInputGate[] inputGates = new TestBufferSizeInputGate[numberOfGates]; + for (int i = 0; i < numberOfGates; i++) { + inputGates[i] = new TestBufferSizeInputGate(numberOfBuffersInUse.get(i)); + } + + BufferDebloater bufferDebloater = + new BufferDebloater( + new Configuration() + .set(BUFFER_DEBLOAT_ENABLED, true) + .set(BUFFER_DEBLOAT_TARGET, Duration.ofMillis(consumptionTime)) + .set( + MEMORY_SEGMENT_SIZE, + MemorySize.parse("" + maxBufferSize, BYTES)) + .set( + MIN_MEMORY_SEGMENT_SIZE, + MemorySize.parse("" + minBufferSize, BYTES)), + inputGates); + + // when: Buffer size is calculated. + bufferDebloater.recalculateBufferSize(throughput); + + // then: Buffer size is in all gates should be as expected. + for (int i = 0; i < numberOfGates; i++) { + assertThat(inputGates[i].lastBufferSize, is(expectedBufferSize)); + } + } + + private static class TestBufferSizeInputGate extends MockInputGate { + private long lastBufferSize = -1; + private final int bufferInUseCount; + + public TestBufferSizeInputGate(int bufferInUseCount) { + // Number of channels don't make sense here because + super(1, Collections.emptyList()); + this.bufferInUseCount = bufferInUseCount; + } + + @Override + public int getBuffersInUseCount() { + return bufferInUseCount; + } + + @Override + public void announceBufferSize(int bufferSize) { + lastBufferSize = bufferSize; + } + } +}
