This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cf75d5  [FLINK-22973] Provide unaligned checkpoint with timeouts 
benchmark
9cf75d5 is described below

commit 9cf75d5115cfe40a73724c6e5a22c48b4c8e403d
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed May 5 15:01:28 2021 +0200

    [FLINK-22973] Provide unaligned checkpoint with timeouts benchmark
---
 .../org/apache/flink/benchmark/BenchmarkBase.java  |  16 +-
 .../UnalignedCheckpointTimeBenchmark.java          | 120 +++++-------
 .../flink/benchmark/operators/RecordSource.java    | 217 +++++++++++++++++++++
 3 files changed, 272 insertions(+), 81 deletions(-)

diff --git a/src/main/java/org/apache/flink/benchmark/BenchmarkBase.java 
b/src/main/java/org/apache/flink/benchmark/BenchmarkBase.java
index 1e47ab8..676e5fb 100644
--- a/src/main/java/org/apache/flink/benchmark/BenchmarkBase.java
+++ b/src/main/java/org/apache/flink/benchmark/BenchmarkBase.java
@@ -33,12 +33,14 @@ import static org.openjdk.jmh.annotations.Scope.Thread;
 @State(Thread)
 @OutputTimeUnit(MILLISECONDS)
 @BenchmarkMode(Throughput)
-@Fork(value = 3, jvmArgsAppend = {
-               "-Djava.rmi.server.hostname=127.0.0.1",
-               "-Dcom.sun.management.jmxremote.authenticate=false",
-               "-Dcom.sun.management.jmxremote.ssl=false",
-               "-Dcom.sun.management.jmxremote.ssl"})
+@Fork(
+        value = 3,
+        jvmArgsAppend = {
+            "-Djava.rmi.server.hostname=127.0.0.1",
+            "-Dcom.sun.management.jmxremote.authenticate=false",
+            "-Dcom.sun.management.jmxremote.ssl=false",
+            "-Dcom.sun.management.jmxremote.ssl"
+        })
 @Warmup(iterations = 10)
 @Measurement(iterations = 10)
-public class BenchmarkBase {
-}
+public class BenchmarkBase {}
diff --git 
a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
 
b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
index df736b8..093ad53 100644
--- 
a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
+++ 
b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
@@ -19,14 +19,15 @@
 package org.apache.flink.benchmark;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.benchmark.operators.RecordSource;
+import org.apache.flink.benchmark.operators.RecordSource.Record;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
@@ -42,45 +43,56 @@ import org.openjdk.jmh.runner.options.VerboseMode;
 import java.io.IOException;
 import java.time.Duration;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
 
 /**
- * The benchmark for measuring the time taken to finish the configured number 
of
- * unaligned checkpoints.
+ * The benchmark for measuring the time taken to finish the configured number 
of unaligned
+ * checkpoints.
  */
-@OutputTimeUnit(MINUTES)
-@OperationsPerInvocation(value = 
UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
+@OutputTimeUnit(SECONDS)
+@OperationsPerInvocation(UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
 public class UnalignedCheckpointTimeBenchmark extends BenchmarkBase {
-    public static final int NUM_FINISHED_CHECKPOINTS = 5;
+    public static final int NUM_FINISHED_CHECKPOINTS = 10;
     private static final int NUM_VERTICES = 3;
     private static final int PARALLELISM = 4;
     private static final long CHECKPOINT_INTERVAL_MS = 10;
 
     public static void main(String[] args) throws RunnerException {
-        Options options = new OptionsBuilder()
-            .verbosity(VerboseMode.NORMAL)
-            .include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
-            .build();
+        Options options =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        
.include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
+                        .build();
 
         new Runner(options).run();
     }
 
     @Benchmark
-    public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext 
context) throws Exception {
+    public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext 
context)
+            throws Exception {
         StreamExecutionEnvironment env = context.env;
-        DataStreamSource<byte[]> source = env.addSource(new 
FiniteCheckpointSource(NUM_FINISHED_CHECKPOINTS));
-        source
-            .slotSharingGroup("source").rebalance()
-            .map((MapFunction<byte[], byte[]>) value -> 
value).slotSharingGroup("map").rebalance()
-            .addSink(new SlowDiscardSink<>()).slotSharingGroup("sink");
+        DataStreamSource<Record> source =
+                env.fromSource(
+                        new RecordSource(NUM_FINISHED_CHECKPOINTS),
+                        noWatermarks(),
+                        RecordSource.class.getName());
+
+        source.slotSharingGroup("source")
+                .rebalance()
+                .map((MapFunction<Record, Record>) value -> value)
+                .slotSharingGroup("map")
+                .rebalance()
+                .addSink(new SlowDiscardSink<>())
+                .slotSharingGroup("sink");
 
         env.execute();
     }
 
     public static class UnalignedCheckpointEnvironmentContext extends 
FlinkEnvironmentContext {
 
-        @Param({"REMOTE", "LOCAL"})
-        public String mode = "REMOTE";
+        @Param({"0", "1", "ALIGNED"})
+        public String timeout = "0";
 
         @Setup
         public void setUp() throws IOException {
@@ -88,73 +100,33 @@ public class UnalignedCheckpointTimeBenchmark extends 
BenchmarkBase {
 
             env.setParallelism(parallelism);
             env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
-            env.getCheckpointConfig().enableUnalignedCheckpoints(true);
-            env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
+            if ("ALIGNED".equals(timeout)) {
+                env.getCheckpointConfig().enableUnalignedCheckpoints(false);
+            } else {
+                env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+                env.getCheckpointConfig()
+                        
.setAlignmentTimeout(Duration.ofMillis(Integer.parseInt(timeout)));
+            }
         }
 
         protected Configuration createConfiguration() {
             Configuration conf = super.createConfiguration();
-
-            if (mode.equals("REMOTE")) {
-                conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
-                conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_VERTICES * PARALLELISM);
-            } else {
-                conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
NUM_VERTICES * PARALLELISM);
-                conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-            }
+            conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+            conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, new 
MemorySize(1024 * 4));
+            conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_VERTICES * PARALLELISM);
             return conf;
         }
     }
 
     /**
-     * The source for finishing the configured number of checkpoints before 
exiting.
-     */
-    public static class FiniteCheckpointSource extends 
RichParallelSourceFunction<byte[]> implements CheckpointListener {
-
-        private final int numExpectedCheckpoints;
-        private final byte[] bytes = new byte[1024];
-
-        private volatile boolean running = true;
-        private volatile int numFinishedCheckpoints;
-
-        FiniteCheckpointSource(int numCheckpoints) {
-            this.numExpectedCheckpoints = numCheckpoints;
-        }
-
-        @Override
-        public void notifyCheckpointComplete(long checkpointId) {
-            ++numFinishedCheckpoints;
-        }
-
-
-        @Override
-        public void run(SourceContext<byte[]> ctx) {
-            while (running) {
-                synchronized (ctx.getCheckpointLock()) {
-                    ctx.collect(bytes);
-
-                    if (numFinishedCheckpoints >= numExpectedCheckpoints) {
-                        cancel();
-                    }
-                }
-            }
-        }
-
-        @Override
-        public void cancel() {
-            running = false;
-        }
-    }
-
-    /**
-     * The custom sink for processing records slowly to cause accumulate 
in-flight
-     * buffers even back pressure.
+     * The custom sink for processing records slowly to cause accumulate 
in-flight buffers even back
+     * pressure.
      */
     public static class SlowDiscardSink<T> implements SinkFunction<T> {
 
         @Override
-        public void invoke(T value, SinkFunction.Context context) throws 
Exception {
-            Thread.sleep(3);
+        public void invoke(T value, Context context) throws Exception {
+            Thread.sleep(1);
         }
     }
 }
diff --git 
a/src/main/java/org/apache/flink/benchmark/operators/RecordSource.java 
b/src/main/java/org/apache/flink/benchmark/operators/RecordSource.java
new file mode 100644
index 0000000..b5c1e46
--- /dev/null
+++ b/src/main/java/org/apache/flink/benchmark/operators/RecordSource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark.operators;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.benchmark.operators.RecordSource.EmptyEnumeratorState;
+import org.apache.flink.benchmark.operators.RecordSource.EmptySplit;
+import org.apache.flink.benchmark.operators.RecordSource.Record;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** A source that generates longs in a fixed number of splits. */
+public class RecordSource implements Source<Record, EmptySplit, 
EmptyEnumeratorState> {
+    public static final int PAYLOAD_SIZE = 1024;
+
+    public static class Record {
+        public long value;
+        public byte[] payload;
+
+        public Record() {
+            this(0);
+        }
+
+        public Record(long value) {
+            this.value = value;
+            payload = new byte[PAYLOAD_SIZE];
+        }
+    }
+
+    private final int minCheckpoints;
+
+    public RecordSource(int minCheckpoints) {
+        this.minCheckpoints = minCheckpoints;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<Record, EmptySplit> createReader(SourceReaderContext 
readerContext) {
+        return new RecourdSourceReader(minCheckpoints);
+    }
+
+    @Override
+    public SplitEnumerator<EmptySplit, EmptyEnumeratorState> createEnumerator(
+            SplitEnumeratorContext<EmptySplit> enumContext) {
+        return new EmptySplitSplitEnumerator();
+    }
+
+    @Override
+    public SplitEnumerator<EmptySplit, EmptyEnumeratorState> restoreEnumerator(
+            SplitEnumeratorContext<EmptySplit> enumContext, 
EmptyEnumeratorState state) {
+        return new EmptySplitSplitEnumerator();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<EmptySplit> getSplitSerializer() {
+        return new SplitVersionedSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<EmptyEnumeratorState> 
getEnumeratorCheckpointSerializer() {
+        return new EnumeratorVersionedSerializer();
+    }
+
+    public static class RecourdSourceReader implements SourceReader<Record, 
EmptySplit> {
+        private final int minCheckpoints;
+        private int numCompletedCheckpoints;
+        private long counter = 0;
+
+        public RecourdSourceReader(int minCheckpoints) {
+            this.minCheckpoints = minCheckpoints;
+        }
+
+        @Override
+        public void start() {}
+
+        @Override
+        public InputStatus pollNext(ReaderOutput<Record> output) throws 
InterruptedException {
+            output.collect(new Record(counter++));
+
+            if (numCompletedCheckpoints >= minCheckpoints) {
+                return InputStatus.END_OF_INPUT;
+            }
+
+            return InputStatus.MORE_AVAILABLE;
+        }
+
+        @Override
+        public List<EmptySplit> snapshotState(long checkpointId) {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) {
+            numCompletedCheckpoints++;
+        }
+
+        @Override
+        public CompletableFuture<Void> isAvailable() {
+            return FutureUtils.completedVoidFuture();
+        }
+
+        @Override
+        public void addSplits(List<EmptySplit> splits) {}
+
+        @Override
+        public void notifyNoMoreSplits() {}
+
+        @Override
+        public void close() throws Exception {}
+    }
+
+    public static class EmptySplit implements SourceSplit {
+        @Override
+        public String splitId() {
+            return "42";
+        }
+    }
+
+    private static class EmptySplitSplitEnumerator
+            implements SplitEnumerator<EmptySplit, EmptyEnumeratorState> {
+        @Override
+        public void start() {}
+
+        @Override
+        public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {}
+
+        @Override
+        public void addSplitsBack(List<EmptySplit> splits, int subtaskId) {}
+
+        @Override
+        public void addReader(int subtaskId) {}
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) {}
+
+        @Override
+        public EmptyEnumeratorState snapshotState(long checkpointId) throws 
Exception {
+            return new EmptyEnumeratorState();
+        }
+
+        @Override
+        public void close() throws IOException {}
+    }
+
+    public static class EmptyEnumeratorState {}
+
+    private static class EnumeratorVersionedSerializer
+            implements SimpleVersionedSerializer<EmptyEnumeratorState> {
+
+        @Override
+        public int getVersion() {
+            return 0;
+        }
+
+        @Override
+        public byte[] serialize(EmptyEnumeratorState state) {
+            return new byte[0];
+        }
+
+        @Override
+        public EmptyEnumeratorState deserialize(int version, byte[] 
serialized) {
+            return new EmptyEnumeratorState();
+        }
+    }
+
+    private static class SplitVersionedSerializer implements 
SimpleVersionedSerializer<EmptySplit> {
+        @Override
+        public int getVersion() {
+            return 0;
+        }
+
+        @Override
+        public byte[] serialize(EmptySplit split) {
+            return new byte[0];
+        }
+
+        @Override
+        public EmptySplit deserialize(int version, byte[] serialized) {
+            return new EmptySplit();
+        }
+    }
+}

Reply via email to