This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b7e227b644151486bea848ed15f3918fdc6ab0e Author: Anton Kalashnikov <[email protected]> AuthorDate: Wed Jul 28 18:32:18 2021 +0200 [FLINK-23453][streaming] Integration BufferDebloater to StreamTask --- .../partition/consumer/TestInputChannel.java | 4 ++ .../flink/streaming/runtime/tasks/StreamTask.java | 28 +++++++--- .../streaming/runtime/tasks/StreamTaskTest.java | 61 +++++++++++++++++++++- .../streaming/util/TestStreamEnvironment.java | 6 +++ pom.xml | 1 + 5 files changed, 92 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java index ae6753c..b0ea5f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java @@ -193,6 +193,10 @@ public class TestInputChannel extends InputChannel { currentBufferSize = newBufferSize; } + public int getCurrentBufferSize() { + return currentBufferSize; + } + @Override int getBuffersInUseCount() { return buffers.size(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index e68df16..ebdfbdb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -79,6 +79,7 @@ import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHand import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.bufferdebloat.BufferDebloater; import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Suspension; @@ -270,6 +271,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab private final ThroughputCalculator throughputCalculator; + private final BufferDebloater bufferDebloater; + + private final long bufferDebloatPeriod; + // ------------------------------------------------------------------------ /** @@ -390,6 +395,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true); this.throughputCalculator = environment.getThroughputMeter(); + this.bufferDebloatPeriod = getTaskConfiguration().get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD); + + this.bufferDebloater = + getTaskConfiguration().get(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED) + ? new BufferDebloater( + getTaskConfiguration(), getEnvironment().getAllInputGates()) + : null; } private TimerService createTimerService(String timerThreadName) { @@ -674,7 +686,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab // final check to exit early before starting to run ensureNotCanceled(); - throughputCalculationSetup(); + scheduleBufferDebloater(); // let the task do its work runMailboxLoop(); @@ -686,17 +698,19 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab afterInvoke(); } - void throughputCalculationSetup() { + private void scheduleBufferDebloater() { systemTimerService.registerTimer( - systemTimerService.getCurrentProcessingTime() - + getTaskConfiguration().get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD), + systemTimerService.getCurrentProcessingTime() + bufferDebloatPeriod, timestamp -> mainMailboxExecutor.submit( () -> { - throughputCalculator.calculateThroughput(); - throughputCalculationSetup(); + long throughput = throughputCalculator.calculateThroughput(); + if (bufferDebloater != null) { + bufferDebloater.recalculateBufferSize(throughput); + } + scheduleBufferDebloater(); }, - "Throughput recalculation")); + "Buffer size recalculation")); } private void runWithCleanUpOnFail(RunnableWithException run) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index b9741e1..8f8220c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -48,6 +48,8 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.mailbox.MailboxExecutor; @@ -146,6 +148,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.StreamCorruptedException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -169,19 +172,21 @@ import static java.util.Collections.singletonList; import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND; import static org.apache.flink.configuration.TaskManagerOptions.AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD; +import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED; +import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton; import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault; import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY; import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning; import static org.apache.flink.util.Preconditions.checkState; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.nullable; @@ -1876,6 +1881,60 @@ public class StreamTaskTest extends TestLogger { } } + @Test + public void testBufferSizeRecalculationStartSuccessfully() throws Exception { + CountDownLatch secondCalculationLatch = new CountDownLatch(2); + int expectedThroughput = 13333; + int inputChannels = 3; + Consumer<StreamConfig> configuration = + (config) -> { + config.getConfiguration().set(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD, 10); + config.getConfiguration().set(BUFFER_DEBLOAT_TARGET, Duration.ofSeconds(1)); + config.getConfiguration().set(BUFFER_DEBLOAT_ENABLED, true); + }; + SupplierWithException<StreamTask<?, ?>, Exception> testTaskFactory = + () -> { + // given: Configured StreamTask with one input channel. + StreamTaskMailboxTestHarnessBuilder<String> builder = + new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, STRING_TYPE_INFO) + .modifyStreamConfig(configuration) + .addInput(STRING_TYPE_INFO, inputChannels) + .setupOutputForSingletonOperatorChain( + new TestBoundedOneInputStreamOperator()); + // and: The throughput meter with predictable calculation result. + StreamTaskMailboxTestHarness<String> harness = + builder.setThroughputMeter( + new ThroughputCalculator( + SystemClock.getInstance(), 10) { + @Override + public long calculateThroughput() { + secondCalculationLatch.countDown(); + return expectedThroughput; + } + }) + .build(); + return harness.streamTask; + }; + + RunningTask<StreamTask<?, ?>> task = runTask(testTaskFactory); + + // when: The second throughput calculation happens + secondCalculationLatch.await(); + + // then: We can be sure the after the first throughput calculation the buffer size was + // changed. + for (InputGate inputGate : task.streamTask.getEnvironment().getAllInputGates()) { + for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) { + assertThat( + ((TestInputChannel) inputGate.getChannel(i)).getCurrentBufferSize(), + is(expectedThroughput / inputChannels)); + } + } + + task.streamTask.cancel(); + } + private MockEnvironment setupEnvironment(boolean... outputAvailabilities) { final Configuration configuration = new Configuration(); new MockStreamConfig(configuration, outputAvailabilities.length); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index 23876c4..6862e49 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.util; import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; @@ -42,6 +43,8 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { Boolean.parseBoolean(System.getProperty("checkpointing.randomization", "false")); private static final String STATE_CHANGE_LOG_CONFIG = System.getProperty("checkpointing.changelog", STATE_CHANGE_LOG_CONFIG_UNSET).trim(); + private static final boolean RANDOMIZE_BUFFER_DEBLOAT_CONFIG = + Boolean.parseBoolean(System.getProperty("buffer-debloat.randomization", "false")); public TestStreamEnvironment( MiniCluster miniCluster, @@ -97,6 +100,9 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { STATE_CHANGE_LOG_CONFIG_RAND)) { randomize(conf, CheckpointingOptions.ENABLE_STATE_CHANGE_LOG, true, false); } + if (RANDOMIZE_BUFFER_DEBLOAT_CONFIG) { + randomize(conf, TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true, false); + } env.configure(conf, env.getUserClassloader()); return env; }; diff --git a/pom.xml b/pom.xml index 2428296..923fe93 100644 --- a/pom.xml +++ b/pom.xml @@ -1529,6 +1529,7 @@ under the License. <forkNumber>0${surefire.forkNumber}</forkNumber> <hadoop.version>${hadoop.version}</hadoop.version> <checkpointing.randomization>true</checkpointing.randomization> + <buffer-debloat.randomization>true</buffer-debloat.randomization> <!-- on, unset, or random --> <checkpointing.changelog>unset</checkpointing.changelog> <project.basedir>${project.basedir}</project.basedir>
