This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
commit 69ead04f3f0e6577e5eae45ed052937e163b1567 Author: Dawid Wysakowicz <[email protected]> AuthorDate: Wed Sep 22 18:37:06 2021 +0200 [FLINK-24230] Buffer debloating microbenchmark for single gate Added a test that first does an environment setup. 1. Submits a job 2. Waits for it being backpressured 3. Waits some extra time for buffer debloating to stabilize The above steps are not measured. After that we trigger a single checkpoint and measure the time for it to complete. --- .../benchmark/CheckpointingTimeBenchmark.java | 291 +++++++++++++++++++++ .../UnalignedCheckpointTimeBenchmark.java | 138 ---------- .../flink/benchmark/operators/RecordSource.java | 23 +- 3 files changed, 306 insertions(+), 146 deletions(-) diff --git a/src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java b/src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java new file mode 100644 index 0000000..481999b --- /dev/null +++ b/src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.benchmark; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.benchmark.operators.RecordSource; +import org.apache.flink.benchmark.operators.RecordSource.Record; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders; +import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo; +import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.net.URI; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.function.Function; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks; + +/** + * The test verifies that the debloating kicks in and properly downsizes buffers. In the end the + * checkpoint should take ~2(number of rebalance) * DEBLOATING_TARGET. + * + * <p>Some info about the chosen numbers: + * + * <ul> + * <li>The minimal memory segment size is decreased (256b) so that the scaling possibility is + * higher. Memory segments start with 4kb + * <li>A memory segment of the minimal size fits ~3 records (of size 64b), each record takes ~1ms + * to be processed by the sink + * <li>We have 2 (exclusive buffers) * 4 (parallelism) + 8 floating = 64 buffers per gate, with + * 300 ms debloating target and ~1ms/record processing speed, we can buffer 300/64 = ~4.5 + * records in a buffer after debloating which means the size of a buffer is slightly above the + * minimal memory segment size. + * <li>The buffer debloating target of 300ms means a checkpoint should take ~2(number of + * exchanges)*300ms=~600ms + * </ul> + */ +@OutputTimeUnit(SECONDS) +@Warmup(iterations = 4) +public class CheckpointingTimeBenchmark extends BenchmarkBase { + public static final int JOB_PARALLELISM = 4; + public static final MemorySize START_MEMORY_SEGMENT_SIZE = MemorySize.parse("4 kb"); + public static final MemorySize MIN_MEMORY_SEGMENT_SIZE = MemorySize.parse("256 b"); + public static final Duration DEBLOATING_TARGET = Duration.of(300, ChronoUnit.MILLIS); + public static final MemorySize DEBLOATING_RECORD_SIZE = MemorySize.parse("64b"); + public static final MemorySize UNALIGNED_RECORD_SIZE = MemorySize.parse("1kb"); + public static final int DEBLOATING_STABILIZATION_PERIOD = 2_000; + + public static void main(String[] args) throws RunnerException { + Options options = + new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(CheckpointingTimeBenchmark.class.getCanonicalName()) + .build(); + + new Runner(options).run(); + } + + @Benchmark + public void checkpointSingleInput(DebloatedCheckpointEnvironmentContext context) + throws Exception { + final CompletableFuture<String> checkpoint = + context.miniCluster.triggerCheckpoint(context.jobID); + checkpoint.get(); + } + + public enum CheckpointMode { + UNALIGNED( + config -> { + config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true); + config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, START_MEMORY_SEGMENT_SIZE); + config.set( + ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, + Duration.ofMillis(0)); + config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false); + return config; + }, + 0, + UNALIGNED_RECORD_SIZE), + UNALIGNED_1( + config -> { + config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true); + config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, START_MEMORY_SEGMENT_SIZE); + config.set( + ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, + Duration.ofMillis(1)); + config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false); + return config; + }, + 0, + UNALIGNED_RECORD_SIZE), + ALIGNED( + config -> { + config.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false); + config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, START_MEMORY_SEGMENT_SIZE); + config.set(TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE, MIN_MEMORY_SEGMENT_SIZE); + config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true); + config.set(TaskManagerOptions.BUFFER_DEBLOAT_TARGET, DEBLOATING_TARGET); + config.set( + TaskManagerOptions.BUFFER_DEBLOAT_PERIOD, + Duration.of(10, ChronoUnit.MILLIS)); + config.set(TaskManagerOptions.BUFFER_DEBLOAT_SAMPLES, 5); + return config; + }, + DEBLOATING_STABILIZATION_PERIOD, + DEBLOATING_RECORD_SIZE); + + private final Function<Configuration, Configuration> configFunc; + private final int sleepPostSetUp; + private final MemorySize recordSize; + + CheckpointMode( + Function<Configuration, Configuration> configFunc, + int sleepPostSetUp, + MemorySize recordSize) { + this.configFunc = configFunc; + this.sleepPostSetUp = sleepPostSetUp; + this.recordSize = recordSize; + } + + public Configuration configure(Configuration config) { + return configFunc.apply(config); + } + + public MemorySize getRecordSize() { + return recordSize; + } + + public int getSleepPostSetUp() { + return sleepPostSetUp; + } + } + + @State(Scope.Thread) + public static class DebloatedCheckpointEnvironmentContext extends FlinkEnvironmentContext { + public JobID jobID; + + @Param({"ALIGNED", "UNALIGNED", "UNALIGNED_1"}) + public CheckpointMode mode; + + @Override + public void setUp() throws Exception { + super.setUp(); + env.setParallelism(JOB_PARALLELISM); + env.enableCheckpointing(Long.MAX_VALUE); + + DataStreamSource<Record> source = + env.fromSource( + new RecordSource( + Integer.MAX_VALUE, (int) mode.getRecordSize().getBytes()), + noWatermarks(), + RecordSource.class.getName()); + + source.slotSharingGroup("source") + .rebalance() + .map((MapFunction<Record, Record>) value -> value) + .slotSharingGroup("map") + .rebalance() + .addSink(new SlowDiscardSink<>()) + .slotSharingGroup("sink"); + + final JobVertexID sourceId = extractSourceId(); + final JobClient jobClient = env.executeAsync(); + jobID = jobClient.getJobID(); + CommonTestUtils.waitForAllTaskRunning(miniCluster, jobID, false); + waitForBackpressure(jobID, sourceId); + if (mode.getSleepPostSetUp() > 0) { + Thread.sleep(mode.getSleepPostSetUp()); + } + } + + private JobVertexID extractSourceId() { + return env.getStreamGraph(false) + .getJobGraph() + .getVerticesSortedTopologicallyFromSources() + .get(0) + .getID(); + } + + private void waitForBackpressure(JobID jobID, JobVertexID sourceId) throws Exception { + final RestClient restClient = + new RestClient( + new UnmodifiableConfiguration(new Configuration()), + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-RestClient-IO"))); + final URI restAddress = miniCluster.getRestAddress().get(); + final JobVertexMessageParameters metricsParameters = new JobVertexMessageParameters(); + metricsParameters.jobPathParameter.resolve(jobID); + metricsParameters.jobVertexIdPathParameter.resolve(sourceId); + JobVertexBackPressureInfo responseBody; + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30)); + do { + responseBody = + restClient + .sendRequest( + restAddress.getHost(), + restAddress.getPort(), + JobVertexBackPressureHeaders.getInstance(), + metricsParameters, + EmptyRequestBody.getInstance()) + .get(); + } while (responseBody.getBackpressureLevel() + != JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH + && deadline.hasTimeLeft()); + if (responseBody.getBackpressureLevel() + != JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH) { + throw new FlinkRuntimeException( + "Could not trigger backpressure for the job in given time."); + } + } + + @Override + protected Configuration createConfiguration() { + return mode.configure(super.createConfiguration()); + } + + @Override + protected int getNumberOfTaskManagers() { + return 3 * JOB_PARALLELISM; + } + + @Override + protected int getNumberOfSlotsPerTaskManager() { + return 1; + } + } + + /** + * The custom sink for processing records slowly to cause accumulate in-flight buffers even back + * pressure. + */ + public static class SlowDiscardSink<T> implements SinkFunction<T> { + @Override + public void invoke(T value, Context context) throws Exception { + Thread.sleep(1); + } + } +} diff --git a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java deleted file mode 100644 index d083ae6..0000000 --- a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.benchmark; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.benchmark.operators.RecordSource; -import org.apache.flink.benchmark.operators.RecordSource.Record; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MemorySize; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; - -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.OperationsPerInvocation; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.RunnerException; -import org.openjdk.jmh.runner.options.Options; -import org.openjdk.jmh.runner.options.OptionsBuilder; -import org.openjdk.jmh.runner.options.VerboseMode; - -import java.time.Duration; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks; - -/** - * The benchmark for measuring the time taken to finish the configured number of unaligned - * checkpoints. - */ -@OutputTimeUnit(SECONDS) -@OperationsPerInvocation(UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS) -public class UnalignedCheckpointTimeBenchmark extends BenchmarkBase { - public static final int NUM_FINISHED_CHECKPOINTS = 10; - private static final int NUM_VERTICES = 3; - private static final int PARALLELISM = 4; - private static final long CHECKPOINT_INTERVAL_MS = 10; - - public static void main(String[] args) throws RunnerException { - Options options = - new OptionsBuilder() - .verbosity(VerboseMode.NORMAL) - .include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName()) - .build(); - - new Runner(options).run(); - } - - @Benchmark - public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext context) - throws Exception { - StreamExecutionEnvironment env = context.env; - DataStreamSource<Record> source = - env.fromSource( - new RecordSource(NUM_FINISHED_CHECKPOINTS), - noWatermarks(), - RecordSource.class.getName()); - - source.slotSharingGroup("source") - .rebalance() - .map((MapFunction<Record, Record>) value -> value) - .slotSharingGroup("map") - .rebalance() - .addSink(new SlowDiscardSink<>()) - .slotSharingGroup("sink"); - - env.execute(); - } - - public static class UnalignedCheckpointEnvironmentContext extends FlinkEnvironmentContext { - - @Param({"0", "1", "ALIGNED"}) - public String timeout = "0"; - - @Override - public void setUp() throws Exception { - super.setUp(); - - env.setParallelism(parallelism); - env.enableCheckpointing(CHECKPOINT_INTERVAL_MS); - if ("ALIGNED".equals(timeout)) { - env.getCheckpointConfig().enableUnalignedCheckpoints(false); - } else { - env.getCheckpointConfig().enableUnalignedCheckpoints(true); - env.getCheckpointConfig() - .setAlignmentTimeout(Duration.ofMillis(Integer.parseInt(timeout))); - } - } - - @Override - protected int getNumberOfSlotsPerTaskManager() { - return 1; - } - - @Override - protected int getNumberOfTaskManagers() { - // why is this using PARALLELISM when we don't actually use it? - return NUM_VERTICES * PARALLELISM; - } - - protected Configuration createConfiguration() { - Configuration conf = super.createConfiguration(); - conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, new MemorySize(1024 * 4)); - return conf; - } - } - - /** - * The custom sink for processing records slowly to cause accumulate in-flight buffers even back - * pressure. - */ - public static class SlowDiscardSink<T> implements SinkFunction<T> { - - @Override - public void invoke(T value, Context context) throws Exception { - Thread.sleep(1); - } - } -} diff --git a/src/main/java/org/apache/flink/benchmark/operators/RecordSource.java b/src/main/java/org/apache/flink/benchmark/operators/RecordSource.java index b2e396d..5f8c610 100644 --- a/src/main/java/org/apache/flink/benchmark/operators/RecordSource.java +++ b/src/main/java/org/apache/flink/benchmark/operators/RecordSource.java @@ -41,26 +41,32 @@ import java.util.concurrent.CompletableFuture; /** A source that generates longs in a fixed number of splits. */ public class RecordSource implements Source<Record, EmptySplit, EmptyEnumeratorState> { - public static final int PAYLOAD_SIZE = 1024; + public static final int DEFAULT_PAYLOAD_SIZE = 1024; + private final int recordSize; public static class Record { public long value; public byte[] payload; public Record() { - this(0); + this(0, DEFAULT_PAYLOAD_SIZE); } - public Record(long value) { + public Record(long value, int recordSize) { this.value = value; - payload = new byte[PAYLOAD_SIZE]; + payload = new byte[recordSize]; } } private final int minCheckpoints; public RecordSource(int minCheckpoints) { + this(minCheckpoints, DEFAULT_PAYLOAD_SIZE); + } + + public RecordSource(int minCheckpoints, int recordSize) { this.minCheckpoints = minCheckpoints; + this.recordSize = recordSize; } @Override @@ -70,7 +76,7 @@ public class RecordSource implements Source<Record, EmptySplit, EmptyEnumeratorS @Override public SourceReader<Record, EmptySplit> createReader(SourceReaderContext readerContext) { - return new RecordSourceReader(minCheckpoints); + return new RecordSourceReader(minCheckpoints, recordSize); } @Override @@ -97,11 +103,13 @@ public class RecordSource implements Source<Record, EmptySplit, EmptyEnumeratorS public static class RecordSourceReader implements SourceReader<Record, EmptySplit> { private final int minCheckpoints; + private final int recordSize; private int numCompletedCheckpoints; private long counter = 0; - public RecordSourceReader(int minCheckpoints) { + public RecordSourceReader(int minCheckpoints, int recordSize) { this.minCheckpoints = minCheckpoints; + this.recordSize = recordSize; } @Override @@ -109,7 +117,7 @@ public class RecordSource implements Source<Record, EmptySplit, EmptyEnumeratorS @Override public InputStatus pollNext(ReaderOutput<Record> output) throws InterruptedException { - output.collect(new Record(counter++)); + output.collect(new Record(counter++, recordSize)); if (numCompletedCheckpoints >= minCheckpoints) { return InputStatus.END_OF_INPUT; @@ -180,7 +188,6 @@ public class RecordSource implements Source<Record, EmptySplit, EmptyEnumeratorS private static class EnumeratorVersionedSerializer implements SimpleVersionedSerializer<EmptyEnumeratorState> { - @Override public int getVersion() { return 0;
