This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b7e227b644151486bea848ed15f3918fdc6ab0e
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Jul 28 18:32:18 2021 +0200

    [FLINK-23453][streaming] Integration BufferDebloater to StreamTask
---
 .../partition/consumer/TestInputChannel.java       |  4 ++
 .../flink/streaming/runtime/tasks/StreamTask.java  | 28 +++++++---
 .../streaming/runtime/tasks/StreamTaskTest.java    | 61 +++++++++++++++++++++-
 .../streaming/util/TestStreamEnvironment.java      |  6 +++
 pom.xml                                            |  1 +
 5 files changed, 92 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index ae6753c..b0ea5f4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -193,6 +193,10 @@ public class TestInputChannel extends InputChannel {
         currentBufferSize = newBufferSize;
     }
 
+    public int getCurrentBufferSize() {
+        return currentBufferSize;
+    }
+
     @Override
     int getBuffersInUseCount() {
         return buffers.size();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e68df16..ebdfbdb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -79,6 +79,7 @@ import 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHand
 import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.bufferdebloat.BufferDebloater;
 import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Suspension;
@@ -270,6 +271,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
 
     private final ThroughputCalculator throughputCalculator;
 
+    private final BufferDebloater bufferDebloater;
+
+    private final long bufferDebloatPeriod;
+
     // ------------------------------------------------------------------------
 
     /**
@@ -390,6 +395,13 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
 
         
environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
         this.throughputCalculator = environment.getThroughputMeter();
+        this.bufferDebloatPeriod = 
getTaskConfiguration().get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD);
+
+        this.bufferDebloater =
+                
getTaskConfiguration().get(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED)
+                        ? new BufferDebloater(
+                                getTaskConfiguration(), 
getEnvironment().getAllInputGates())
+                        : null;
     }
 
     private TimerService createTimerService(String timerThreadName) {
@@ -674,7 +686,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
         // final check to exit early before starting to run
         ensureNotCanceled();
 
-        throughputCalculationSetup();
+        scheduleBufferDebloater();
 
         // let the task do its work
         runMailboxLoop();
@@ -686,17 +698,19 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
         afterInvoke();
     }
 
-    void throughputCalculationSetup() {
+    private void scheduleBufferDebloater() {
         systemTimerService.registerTimer(
-                systemTimerService.getCurrentProcessingTime()
-                        + 
getTaskConfiguration().get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD),
+                systemTimerService.getCurrentProcessingTime() + 
bufferDebloatPeriod,
                 timestamp ->
                         mainMailboxExecutor.submit(
                                 () -> {
-                                    throughputCalculator.calculateThroughput();
-                                    throughputCalculationSetup();
+                                    long throughput = 
throughputCalculator.calculateThroughput();
+                                    if (bufferDebloater != null) {
+                                        
bufferDebloater.recalculateBufferSize(throughput);
+                                    }
+                                    scheduleBufferDebloater();
                                 },
-                                "Throughput recalculation"));
+                                "Buffer size recalculation"));
     }
 
     private void runWithCleanUpOnFail(RunnableWithException run) throws 
Exception {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index b9741e1..8f8220c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -48,6 +48,8 @@ import 
org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import 
org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.mailbox.MailboxExecutor;
@@ -146,6 +148,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.StreamCorruptedException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -169,19 +172,21 @@ import static java.util.Collections.singletonList;
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;
 import static 
org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD;
+import static 
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED;
+import static 
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE;
 import static 
org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.nullable;
@@ -1876,6 +1881,60 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testBufferSizeRecalculationStartSuccessfully() throws 
Exception {
+        CountDownLatch secondCalculationLatch = new CountDownLatch(2);
+        int expectedThroughput = 13333;
+        int inputChannels = 3;
+        Consumer<StreamConfig> configuration =
+                (config) -> {
+                    
config.getConfiguration().set(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD, 10);
+                    config.getConfiguration().set(BUFFER_DEBLOAT_TARGET, 
Duration.ofSeconds(1));
+                    config.getConfiguration().set(BUFFER_DEBLOAT_ENABLED, 
true);
+                };
+        SupplierWithException<StreamTask<?, ?>, Exception> testTaskFactory =
+                () -> {
+                    // given: Configured StreamTask with one input channel.
+                    StreamTaskMailboxTestHarnessBuilder<String> builder =
+                            new StreamTaskMailboxTestHarnessBuilder<>(
+                                            OneInputStreamTask::new, 
STRING_TYPE_INFO)
+                                    .modifyStreamConfig(configuration)
+                                    .addInput(STRING_TYPE_INFO, inputChannels)
+                                    .setupOutputForSingletonOperatorChain(
+                                            new 
TestBoundedOneInputStreamOperator());
+                    // and: The throughput meter with predictable calculation 
result.
+                    StreamTaskMailboxTestHarness<String> harness =
+                            builder.setThroughputMeter(
+                                            new ThroughputCalculator(
+                                                    SystemClock.getInstance(), 
10) {
+                                                @Override
+                                                public long 
calculateThroughput() {
+                                                    
secondCalculationLatch.countDown();
+                                                    return expectedThroughput;
+                                                }
+                                            })
+                                    .build();
+                    return harness.streamTask;
+                };
+
+        RunningTask<StreamTask<?, ?>> task = runTask(testTaskFactory);
+
+        // when: The second throughput calculation happens
+        secondCalculationLatch.await();
+
+        // then: We can be sure the after the first throughput calculation the 
buffer size was
+        // changed.
+        for (InputGate inputGate : 
task.streamTask.getEnvironment().getAllInputGates()) {
+            for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
+                assertThat(
+                        ((TestInputChannel) 
inputGate.getChannel(i)).getCurrentBufferSize(),
+                        is(expectedThroughput / inputChannels));
+            }
+        }
+
+        task.streamTask.cancel();
+    }
+
     private MockEnvironment setupEnvironment(boolean... outputAvailabilities) {
         final Configuration configuration = new Configuration();
         new MockStreamConfig(configuration, outputAvailabilities.length);
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 23876c4..6862e49 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
@@ -42,6 +43,8 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
             
Boolean.parseBoolean(System.getProperty("checkpointing.randomization", 
"false"));
     private static final String STATE_CHANGE_LOG_CONFIG =
             System.getProperty("checkpointing.changelog", 
STATE_CHANGE_LOG_CONFIG_UNSET).trim();
+    private static final boolean RANDOMIZE_BUFFER_DEBLOAT_CONFIG =
+            
Boolean.parseBoolean(System.getProperty("buffer-debloat.randomization", 
"false"));
 
     public TestStreamEnvironment(
             MiniCluster miniCluster,
@@ -97,6 +100,9 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
                             STATE_CHANGE_LOG_CONFIG_RAND)) {
                         randomize(conf, 
CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true, false);
                     }
+                    if (RANDOMIZE_BUFFER_DEBLOAT_CONFIG) {
+                        randomize(conf, 
TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true, false);
+                    }
                     env.configure(conf, env.getUserClassloader());
                     return env;
                 };
diff --git a/pom.xml b/pom.xml
index 2428296..923fe93 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1529,6 +1529,7 @@ under the License.
                                                
<forkNumber>0${surefire.forkNumber}</forkNumber>
                                                
<hadoop.version>${hadoop.version}</hadoop.version>
                                                
<checkpointing.randomization>true</checkpointing.randomization>
+                                               
<buffer-debloat.randomization>true</buffer-debloat.randomization>
                                                <!-- on, unset, or random -->
                                                
<checkpointing.changelog>unset</checkpointing.changelog>
                                                
<project.basedir>${project.basedir}</project.basedir>

Reply via email to