This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9158fc76263b238eeb4a0424a843cb8a59269557
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Jul 28 18:25:41 2021 +0200

    [FLINK-23453][streaming] Created the buffer debloater for the ability to 
automatically change the buffer size based on the throughput.
---
 .../runtime/throughput/ThroughputCalculator.java   |   5 +-
 .../tasks/bufferdebloat/BufferDebloater.java       |  99 +++++++++++++
 .../tasks/bufferdebloat/BufferDebloaterTest.java   | 163 +++++++++++++++++++++
 3 files changed, 264 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java
index 83aa408..e6911f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/ThroughputCalculator.java
@@ -29,7 +29,6 @@ public class ThroughputCalculator {
     private long currentAccumulatedDataSize;
     private long currentMeasurementTime;
     private long measurementStartTime = NOT_TRACKED;
-    private long lastThroughput;
 
     public ThroughputCalculator(Clock clock, int numberOfSamples) {
         this.clock = clock;
@@ -62,13 +61,13 @@ public class ThroughputCalculator {
             currentMeasurementTime += clock.relativeTimeMillis() - 
measurementStartTime;
         }
 
-        lastThroughput =
+        long throughput =
                 throughputEMA.calculateThroughput(
                         currentAccumulatedDataSize, currentMeasurementTime);
 
         measurementStartTime = clock.relativeTimeMillis();
         currentAccumulatedDataSize = currentMeasurementTime = 0;
 
-        return lastThroughput;
+        return throughput;
     }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java
new file mode 100644
index 0000000..8b2b175
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloater.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.bufferdebloat;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+
+import static 
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET;
+import static 
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Class for automatic calculation of the buffer size based on the current 
throughput and
+ * configuration.
+ */
+public class BufferDebloater {
+    private static final double MILLIS_IN_SECOND = 1000.0;
+
+    /**
+     * How different should be the total buffer size compare to throughput 
(when it is 1.0 then
+     * bufferSize == throughput).
+     */
+    private final double targetBufferSizeFactor;
+
+    private final IndexedInputGate[] inputGates;
+    private final long maxBufferSize;
+    private final long minBufferSize;
+    private final int bufferDebloatThresholdPercentages;
+
+    private int lastBufferSize;
+
+    public BufferDebloater(Configuration taskConfig, IndexedInputGate[] 
inputGates) {
+        this.inputGates = inputGates;
+        this.targetBufferSizeFactor =
+                taskConfig.get(BUFFER_DEBLOAT_TARGET).toMillis() / 
MILLIS_IN_SECOND;
+        this.maxBufferSize = 
taskConfig.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE).getBytes();
+        this.minBufferSize = 
taskConfig.get(TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE).getBytes();
+
+        this.bufferDebloatThresholdPercentages =
+                taskConfig.getInteger(BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES);
+
+        this.lastBufferSize = (int) maxBufferSize;
+
+        // Right now the buffer size can not be grater than integer max value 
according to
+        // MemorySegment and buffer implementation.
+        checkArgument(maxBufferSize <= Integer.MAX_VALUE);
+        checkArgument(maxBufferSize > 0);
+        checkArgument(minBufferSize > 0);
+        checkArgument(maxBufferSize >= minBufferSize);
+        checkArgument(targetBufferSizeFactor > 0.0);
+    }
+
+    public void recalculateBufferSize(long currentThroughput) {
+        long desiredTotalBufferSizeInBytes = (long) (currentThroughput * 
targetBufferSizeFactor);
+
+        int totalNumber = 0;
+        for (IndexedInputGate inputGate : inputGates) {
+            totalNumber += Math.max(1, inputGate.getBuffersInUseCount());
+        }
+        int newSize =
+                (int)
+                        Math.max(
+                                minBufferSize,
+                                Math.min(
+                                        desiredTotalBufferSizeInBytes / 
totalNumber,
+                                        maxBufferSize));
+
+        boolean skipUpdate =
+                Math.abs(1 - ((double) lastBufferSize) / newSize) * 100
+                        < bufferDebloatThresholdPercentages;
+
+        // Skip update if the new value pretty close to the old one.
+        if (skipUpdate) {
+            return;
+        }
+
+        lastBufferSize = newSize;
+        for (IndexedInputGate inputGate : inputGates) {
+            inputGate.announceBufferSize(newSize);
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java
new file mode 100644
index 0000000..853f6c2
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/bufferdebloat/BufferDebloaterTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.bufferdebloat;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.runtime.io.MockInputGate;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.BYTES;
+import static 
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED;
+import static 
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET;
+import static 
org.apache.flink.configuration.TaskManagerOptions.MEMORY_SEGMENT_SIZE;
+import static 
org.apache.flink.configuration.TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** Test for {@link BufferDebloater}. */
+public class BufferDebloaterTest extends TestLogger {
+
+    @Test
+    public void testZeroBuffersInUse() {
+        // if the gate returns the zero buffers in use it should be 
transformed to 1.
+        testBufferSizeCalculation(3, asList(0, 1, 0), 3333, 50, 2400, 1000, 
1111);
+    }
+
+    @Test
+    public void testCorrectBufferSizeCalculation() {
+        testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 1100, 1200, 
249);
+    }
+
+    @Test
+    public void testCalculatedBufferSizeLessThanMin() {
+        testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 250, 1100, 1200, 
250);
+    }
+
+    @Test
+    public void testCalculatedBufferSizeForThroughputZero() {
+        // When the throughput is zero then min buffer size will be taken.
+        testBufferSizeCalculation(3, asList(3, 5, 8), 0, 50, 1100, 1200, 50);
+    }
+
+    @Test
+    public void testConfiguredConsumptionTimeIsTooLow() {
+        // When the consumption time is low then min buffer size will be taken.
+        testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 1100, 7, 50);
+    }
+
+    @Test
+    public void testCalculatedBufferSizeGreaterThanMax() {
+        // New calculated buffer size should be more than max value it means 
that we should take max
+        // value which means that no updates should happen(-1 means that we 
take the initial value)
+        // because the old value equal to new value.
+        testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 248, 1200, -1);
+    }
+
+    @Test
+    public void testCalculatedBufferSlightlyDifferentFromCurrentOne() {
+        // New calculated buffer size should be a little less than current 
value(or max value which
+        // is the same) it means that no updates should happen(-1 means that 
we take the initial
+        // value) because the new value is not so different from the old one.
+        testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 250, 1200, -1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNegativeMinBufferSize() {
+        testBufferSizeCalculation(3, asList(3, 5, 8), 3333, -1, 248, 1200, 
248);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNegativeMaxBufferSize() {
+        testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, -1, 1200, 248);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMinGreaterThanMaxBufferSize() {
+        testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 49, 1200, 248);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNegativeConsumptionTime() {
+        testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 1100, -1, 248);
+    }
+
+    private void testBufferSizeCalculation(
+            int numberOfGates,
+            List<Integer> numberOfBuffersInUse,
+            long throughput,
+            long minBufferSize,
+            long maxBufferSize,
+            int consumptionTime,
+            long expectedBufferSize) {
+        TestBufferSizeInputGate[] inputGates = new 
TestBufferSizeInputGate[numberOfGates];
+        for (int i = 0; i < numberOfGates; i++) {
+            inputGates[i] = new 
TestBufferSizeInputGate(numberOfBuffersInUse.get(i));
+        }
+
+        BufferDebloater bufferDebloater =
+                new BufferDebloater(
+                        new Configuration()
+                                .set(BUFFER_DEBLOAT_ENABLED, true)
+                                .set(BUFFER_DEBLOAT_TARGET, 
Duration.ofMillis(consumptionTime))
+                                .set(
+                                        MEMORY_SEGMENT_SIZE,
+                                        MemorySize.parse("" + maxBufferSize, 
BYTES))
+                                .set(
+                                        MIN_MEMORY_SEGMENT_SIZE,
+                                        MemorySize.parse("" + minBufferSize, 
BYTES)),
+                        inputGates);
+
+        // when: Buffer size is calculated.
+        bufferDebloater.recalculateBufferSize(throughput);
+
+        // then: Buffer size is in all gates should be as expected.
+        for (int i = 0; i < numberOfGates; i++) {
+            assertThat(inputGates[i].lastBufferSize, is(expectedBufferSize));
+        }
+    }
+
+    private static class TestBufferSizeInputGate extends MockInputGate {
+        private long lastBufferSize = -1;
+        private final int bufferInUseCount;
+
+        public TestBufferSizeInputGate(int bufferInUseCount) {
+            // Number of channels don't make sense here because
+            super(1, Collections.emptyList());
+            this.bufferInUseCount = bufferInUseCount;
+        }
+
+        @Override
+        public int getBuffersInUseCount() {
+            return bufferInUseCount;
+        }
+
+        @Override
+        public void announceBufferSize(int bufferSize) {
+            lastBufferSize = bufferSize;
+        }
+    }
+}

Reply via email to