This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
The following commit(s) were added to refs/heads/master by this push:
new 9cf75d5 [FLINK-22973] Provide unaligned checkpoint with timeouts
benchmark
9cf75d5 is described below
commit 9cf75d5115cfe40a73724c6e5a22c48b4c8e403d
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed May 5 15:01:28 2021 +0200
[FLINK-22973] Provide unaligned checkpoint with timeouts benchmark
---
.../org/apache/flink/benchmark/BenchmarkBase.java | 16 +-
.../UnalignedCheckpointTimeBenchmark.java | 120 +++++-------
.../flink/benchmark/operators/RecordSource.java | 217 +++++++++++++++++++++
3 files changed, 272 insertions(+), 81 deletions(-)
diff --git a/src/main/java/org/apache/flink/benchmark/BenchmarkBase.java
b/src/main/java/org/apache/flink/benchmark/BenchmarkBase.java
index 1e47ab8..676e5fb 100644
--- a/src/main/java/org/apache/flink/benchmark/BenchmarkBase.java
+++ b/src/main/java/org/apache/flink/benchmark/BenchmarkBase.java
@@ -33,12 +33,14 @@ import static org.openjdk.jmh.annotations.Scope.Thread;
@State(Thread)
@OutputTimeUnit(MILLISECONDS)
@BenchmarkMode(Throughput)
-@Fork(value = 3, jvmArgsAppend = {
- "-Djava.rmi.server.hostname=127.0.0.1",
- "-Dcom.sun.management.jmxremote.authenticate=false",
- "-Dcom.sun.management.jmxremote.ssl=false",
- "-Dcom.sun.management.jmxremote.ssl"})
+@Fork(
+ value = 3,
+ jvmArgsAppend = {
+ "-Djava.rmi.server.hostname=127.0.0.1",
+ "-Dcom.sun.management.jmxremote.authenticate=false",
+ "-Dcom.sun.management.jmxremote.ssl=false",
+ "-Dcom.sun.management.jmxremote.ssl"
+ })
@Warmup(iterations = 10)
@Measurement(iterations = 10)
-public class BenchmarkBase {
-}
+public class BenchmarkBase {}
diff --git
a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
index df736b8..093ad53 100644
---
a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
+++
b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
@@ -19,14 +19,15 @@
package org.apache.flink.benchmark;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.benchmark.operators.RecordSource;
+import org.apache.flink.benchmark.operators.RecordSource.Record;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
@@ -42,45 +43,56 @@ import org.openjdk.jmh.runner.options.VerboseMode;
import java.io.IOException;
import java.time.Duration;
-import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
/**
- * The benchmark for measuring the time taken to finish the configured number
of
- * unaligned checkpoints.
+ * The benchmark for measuring the time taken to finish the configured number
of unaligned
+ * checkpoints.
*/
-@OutputTimeUnit(MINUTES)
-@OperationsPerInvocation(value =
UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
+@OutputTimeUnit(SECONDS)
+@OperationsPerInvocation(UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
public class UnalignedCheckpointTimeBenchmark extends BenchmarkBase {
- public static final int NUM_FINISHED_CHECKPOINTS = 5;
+ public static final int NUM_FINISHED_CHECKPOINTS = 10;
private static final int NUM_VERTICES = 3;
private static final int PARALLELISM = 4;
private static final long CHECKPOINT_INTERVAL_MS = 10;
public static void main(String[] args) throws RunnerException {
- Options options = new OptionsBuilder()
- .verbosity(VerboseMode.NORMAL)
- .include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
- .build();
+ Options options =
+ new OptionsBuilder()
+ .verbosity(VerboseMode.NORMAL)
+
.include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
+ .build();
new Runner(options).run();
}
@Benchmark
- public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext
context) throws Exception {
+ public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext
context)
+ throws Exception {
StreamExecutionEnvironment env = context.env;
- DataStreamSource<byte[]> source = env.addSource(new
FiniteCheckpointSource(NUM_FINISHED_CHECKPOINTS));
- source
- .slotSharingGroup("source").rebalance()
- .map((MapFunction<byte[], byte[]>) value ->
value).slotSharingGroup("map").rebalance()
- .addSink(new SlowDiscardSink<>()).slotSharingGroup("sink");
+ DataStreamSource<Record> source =
+ env.fromSource(
+ new RecordSource(NUM_FINISHED_CHECKPOINTS),
+ noWatermarks(),
+ RecordSource.class.getName());
+
+ source.slotSharingGroup("source")
+ .rebalance()
+ .map((MapFunction<Record, Record>) value -> value)
+ .slotSharingGroup("map")
+ .rebalance()
+ .addSink(new SlowDiscardSink<>())
+ .slotSharingGroup("sink");
env.execute();
}
public static class UnalignedCheckpointEnvironmentContext extends
FlinkEnvironmentContext {
- @Param({"REMOTE", "LOCAL"})
- public String mode = "REMOTE";
+ @Param({"0", "1", "ALIGNED"})
+ public String timeout = "0";
@Setup
public void setUp() throws IOException {
@@ -88,73 +100,33 @@ public class UnalignedCheckpointTimeBenchmark extends
BenchmarkBase {
env.setParallelism(parallelism);
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
- env.getCheckpointConfig().enableUnalignedCheckpoints(true);
- env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
+ if ("ALIGNED".equals(timeout)) {
+ env.getCheckpointConfig().enableUnalignedCheckpoints(false);
+ } else {
+ env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+ env.getCheckpointConfig()
+
.setAlignmentTimeout(Duration.ofMillis(Integer.parseInt(timeout)));
+ }
}
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
-
- if (mode.equals("REMOTE")) {
- conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
- conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
NUM_VERTICES * PARALLELISM);
- } else {
- conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,
NUM_VERTICES * PARALLELISM);
- conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- }
+ conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+ conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, new
MemorySize(1024 * 4));
+ conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
NUM_VERTICES * PARALLELISM);
return conf;
}
}
/**
- * The source for finishing the configured number of checkpoints before
exiting.
- */
- public static class FiniteCheckpointSource extends
RichParallelSourceFunction<byte[]> implements CheckpointListener {
-
- private final int numExpectedCheckpoints;
- private final byte[] bytes = new byte[1024];
-
- private volatile boolean running = true;
- private volatile int numFinishedCheckpoints;
-
- FiniteCheckpointSource(int numCheckpoints) {
- this.numExpectedCheckpoints = numCheckpoints;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- ++numFinishedCheckpoints;
- }
-
-
- @Override
- public void run(SourceContext<byte[]> ctx) {
- while (running) {
- synchronized (ctx.getCheckpointLock()) {
- ctx.collect(bytes);
-
- if (numFinishedCheckpoints >= numExpectedCheckpoints) {
- cancel();
- }
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-
- /**
- * The custom sink for processing records slowly to cause accumulate
in-flight
- * buffers even back pressure.
+ * The custom sink for processing records slowly to cause accumulate
in-flight buffers even back
+ * pressure.
*/
public static class SlowDiscardSink<T> implements SinkFunction<T> {
@Override
- public void invoke(T value, SinkFunction.Context context) throws
Exception {
- Thread.sleep(3);
+ public void invoke(T value, Context context) throws Exception {
+ Thread.sleep(1);
}
}
}
diff --git
a/src/main/java/org/apache/flink/benchmark/operators/RecordSource.java
b/src/main/java/org/apache/flink/benchmark/operators/RecordSource.java
new file mode 100644
index 0000000..b5c1e46
--- /dev/null
+++ b/src/main/java/org/apache/flink/benchmark/operators/RecordSource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark.operators;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.benchmark.operators.RecordSource.EmptyEnumeratorState;
+import org.apache.flink.benchmark.operators.RecordSource.EmptySplit;
+import org.apache.flink.benchmark.operators.RecordSource.Record;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** A source that generates longs in a fixed number of splits. */
+public class RecordSource implements Source<Record, EmptySplit,
EmptyEnumeratorState> {
+ public static final int PAYLOAD_SIZE = 1024;
+
+ public static class Record {
+ public long value;
+ public byte[] payload;
+
+ public Record() {
+ this(0);
+ }
+
+ public Record(long value) {
+ this.value = value;
+ payload = new byte[PAYLOAD_SIZE];
+ }
+ }
+
+ private final int minCheckpoints;
+
+ public RecordSource(int minCheckpoints) {
+ this.minCheckpoints = minCheckpoints;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public SourceReader<Record, EmptySplit> createReader(SourceReaderContext
readerContext) {
+ return new RecourdSourceReader(minCheckpoints);
+ }
+
+ @Override
+ public SplitEnumerator<EmptySplit, EmptyEnumeratorState> createEnumerator(
+ SplitEnumeratorContext<EmptySplit> enumContext) {
+ return new EmptySplitSplitEnumerator();
+ }
+
+ @Override
+ public SplitEnumerator<EmptySplit, EmptyEnumeratorState> restoreEnumerator(
+ SplitEnumeratorContext<EmptySplit> enumContext,
EmptyEnumeratorState state) {
+ return new EmptySplitSplitEnumerator();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<EmptySplit> getSplitSerializer() {
+ return new SplitVersionedSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<EmptyEnumeratorState>
getEnumeratorCheckpointSerializer() {
+ return new EnumeratorVersionedSerializer();
+ }
+
+ public static class RecourdSourceReader implements SourceReader<Record,
EmptySplit> {
+ private final int minCheckpoints;
+ private int numCompletedCheckpoints;
+ private long counter = 0;
+
+ public RecourdSourceReader(int minCheckpoints) {
+ this.minCheckpoints = minCheckpoints;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<Record> output) throws
InterruptedException {
+ output.collect(new Record(counter++));
+
+ if (numCompletedCheckpoints >= minCheckpoints) {
+ return InputStatus.END_OF_INPUT;
+ }
+
+ return InputStatus.MORE_AVAILABLE;
+ }
+
+ @Override
+ public List<EmptySplit> snapshotState(long checkpointId) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ numCompletedCheckpoints++;
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ return FutureUtils.completedVoidFuture();
+ }
+
+ @Override
+ public void addSplits(List<EmptySplit> splits) {}
+
+ @Override
+ public void notifyNoMoreSplits() {}
+
+ @Override
+ public void close() throws Exception {}
+ }
+
+ public static class EmptySplit implements SourceSplit {
+ @Override
+ public String splitId() {
+ return "42";
+ }
+ }
+
+ private static class EmptySplitSplitEnumerator
+ implements SplitEnumerator<EmptySplit, EmptyEnumeratorState> {
+ @Override
+ public void start() {}
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname) {}
+
+ @Override
+ public void addSplitsBack(List<EmptySplit> splits, int subtaskId) {}
+
+ @Override
+ public void addReader(int subtaskId) {}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {}
+
+ @Override
+ public EmptyEnumeratorState snapshotState(long checkpointId) throws
Exception {
+ return new EmptyEnumeratorState();
+ }
+
+ @Override
+ public void close() throws IOException {}
+ }
+
+ public static class EmptyEnumeratorState {}
+
+ private static class EnumeratorVersionedSerializer
+ implements SimpleVersionedSerializer<EmptyEnumeratorState> {
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(EmptyEnumeratorState state) {
+ return new byte[0];
+ }
+
+ @Override
+ public EmptyEnumeratorState deserialize(int version, byte[]
serialized) {
+ return new EmptyEnumeratorState();
+ }
+ }
+
+ private static class SplitVersionedSerializer implements
SimpleVersionedSerializer<EmptySplit> {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(EmptySplit split) {
+ return new byte[0];
+ }
+
+ @Override
+ public EmptySplit deserialize(int version, byte[] serialized) {
+ return new EmptySplit();
+ }
+ }
+}