This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit 2df81781c55f145deb815591bec922a735e7dfa6
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Oct 29 15:12:45 2020 +0100

    [FLINK-19884] Add benchmarks for batch-style execution for bounded keyed 
streams
---
 .../benchmark/SortingBoundedInputBenchmarks.java   | 401 +++++++++++++++++++++
 1 file changed, 401 insertions(+)

diff --git 
a/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java 
b/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
new file mode 100644
index 0000000..06628a4
--- /dev/null
+++ 
b/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.SplittableIterator;
+
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.openjdk.jmh.annotations.Scope.Thread;
+
+/**
+ * An end to end test for sorted inputs for a keyed operator with bounded 
inputs.
+ */
+public class SortingBoundedInputBenchmarks extends BenchmarkBase {
+
+       private static final int RECORDS_PER_INVOCATION = 1_500_000;
+       private static final List<Integer> INDICES = IntStream.range(0, 
10).boxed().collect(Collectors.toList());
+       static {
+               Collections.shuffle(INDICES);
+       }
+
+       public static void main(String[] args) throws RunnerException {
+               Options options = new OptionsBuilder()
+                               .verbosity(VerboseMode.NORMAL)
+                               .include(".*" + 
SortingBoundedInputBenchmarks.class.getCanonicalName() + ".*")
+                               .build();
+
+               new Runner(options).run();
+       }
+
+       @State(Thread)
+       public static class SortingInputContext extends FlinkEnvironmentContext 
{
+               @Override
+               protected Configuration createConfiguration() {
+                       Configuration configuration = 
super.createConfiguration();
+                       configuration.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
+                       
configuration.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, 0f);
+                       return configuration;
+               }
+       }
+
+       @Benchmark
+       @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+       public void sortedOneInput(SortingInputContext context) throws 
Exception {
+               StreamExecutionEnvironment env = context.env;
+
+               DataStreamSource<Integer> elements = env.fromParallelCollection(
+                       new InputGenerator(RECORDS_PER_INVOCATION),
+                       BasicTypeInfo.INT_TYPE_INFO
+               );
+
+               SingleOutputStreamOperator<Long> counts = elements
+                       .keyBy(element -> element)
+                       .transform(
+                               "Asserting operator",
+                               BasicTypeInfo.LONG_TYPE_INFO,
+                               new AssertingOperator()
+                       );
+
+               counts.addSink(new DiscardingSink<>());
+               context.execute();
+       }
+
+       @Benchmark
+       @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+       public void sortedTwoInput(SortingInputContext context) throws 
Exception {
+               StreamExecutionEnvironment env = context.env;
+
+               DataStreamSource<Integer> elements1 = 
env.fromParallelCollection(
+                       new InputGenerator(RECORDS_PER_INVOCATION / 2),
+                       BasicTypeInfo.INT_TYPE_INFO
+               );
+
+               DataStreamSource<Integer> elements2 = 
env.fromParallelCollection(
+                       new InputGenerator(RECORDS_PER_INVOCATION / 2),
+                       BasicTypeInfo.INT_TYPE_INFO
+               );
+               SingleOutputStreamOperator<Long> counts = 
elements1.connect(elements2)
+                       .keyBy(
+                               element -> element,
+                               element -> element
+                       )
+                       .transform(
+                               "Asserting operator",
+                               BasicTypeInfo.LONG_TYPE_INFO,
+                               new AssertingTwoInputOperator()
+                       );
+
+               counts.addSink(new DiscardingSink<>());
+               context.execute();
+       }
+
+       @Benchmark
+       @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+       public void sortedMultiInput(SortingInputContext context) throws 
Exception {
+               StreamExecutionEnvironment env = context.env;
+
+               KeyedStream<Integer, Object> elements1 = 
env.fromParallelCollection(
+                       new InputGenerator(RECORDS_PER_INVOCATION / 3),
+                       BasicTypeInfo.INT_TYPE_INFO
+               ).keyBy(el -> el);
+
+               KeyedStream<Integer, Object> elements2 = 
env.fromParallelCollection(
+                       new InputGenerator(RECORDS_PER_INVOCATION / 3),
+                       BasicTypeInfo.INT_TYPE_INFO
+               ).keyBy(el -> el);
+
+               KeyedStream<Integer, Object> elements3 = 
env.fromParallelCollection(
+                       new InputGenerator(RECORDS_PER_INVOCATION / 3),
+                       BasicTypeInfo.INT_TYPE_INFO
+               ).keyBy(el -> el);
+
+               KeyedMultipleInputTransformation<Long> assertingTransformation 
= new KeyedMultipleInputTransformation<>(
+                       "Asserting operator",
+                       new AssertingThreeInputOperatorFactory(),
+                       BasicTypeInfo.LONG_TYPE_INFO,
+                       -1,
+                       BasicTypeInfo.INT_TYPE_INFO
+               );
+               assertingTransformation.addInput(elements1.getTransformation(), 
elements1.getKeySelector());
+               assertingTransformation.addInput(elements2.getTransformation(), 
elements2.getKeySelector());
+               assertingTransformation.addInput(elements3.getTransformation(), 
elements3.getKeySelector());
+
+               env.addOperator(assertingTransformation);
+               DataStream<Long> counts = new DataStream<>(env, 
assertingTransformation);
+
+               counts.addSink(new DiscardingSink<>());
+               context.execute();
+       }
+
+       private static final class ProcessedKeysOrderAsserter implements 
Serializable {
+               private final Set<Integer> seenKeys = new HashSet<>();
+               private long seenRecords = 0;
+               private Integer currentKey = null;
+
+               public void processElement(Integer element) {
+                       this.seenRecords++;
+                       if (!Objects.equals(element, currentKey)) {
+                               if (!seenKeys.add(element)) {
+                                       Assert.fail("Received an out of order 
key: " + element);
+                               }
+                               currentKey = element;
+                       }
+               }
+
+               public long getSeenRecords() {
+                       return seenRecords;
+               }
+       }
+
+       private static class AssertingOperator extends 
AbstractStreamOperator<Long>
+                       implements OneInputStreamOperator<Integer, Long>, 
BoundedOneInput {
+               private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
+
+               @Override
+               public void processElement(StreamRecord<Integer> element) {
+                       asserter.processElement(element.getValue());
+               }
+
+               @Override
+               public void endInput() {
+                       output.collect(new 
StreamRecord<>(asserter.getSeenRecords()));
+               }
+
+       }
+
+       private static class AssertingTwoInputOperator extends 
AbstractStreamOperator<Long>
+                       implements TwoInputStreamOperator<Integer, Integer, 
Long>, BoundedMultiInput {
+               private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
+               private boolean input1Finished = false;
+               private boolean input2Finished = false;
+
+               @Override
+               public void processElement1(StreamRecord<Integer> element) {
+                       asserter.processElement(element.getValue());
+               }
+
+               @Override
+               public void processElement2(StreamRecord<Integer> element) {
+                       asserter.processElement(element.getValue());
+               }
+
+               @Override
+               public void endInput(int inputId) {
+                       if (inputId == 1) {
+                               input1Finished = true;
+                       }
+
+                       if (inputId == 2) {
+                               input2Finished = true;
+                       }
+
+                       if (input1Finished && input2Finished) {
+                               output.collect(new 
StreamRecord<>(asserter.getSeenRecords()));
+                       }
+               }
+       }
+
+       private static class AssertingThreeInputOperator extends 
AbstractStreamOperatorV2<Long>
+                       implements MultipleInputStreamOperator<Long>, 
BoundedMultiInput {
+               private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
+               private boolean input1Finished = false;
+               private boolean input2Finished = false;
+               private boolean input3Finished = false;
+
+               public AssertingThreeInputOperator(
+                               StreamOperatorParameters<Long> parameters,
+                               int numberOfInputs) {
+                       super(parameters, 3);
+                       assert numberOfInputs == 3;
+               }
+
+               @Override
+               public void endInput(int inputId) {
+                       if (inputId == 1) {
+                               input1Finished = true;
+                       }
+
+                       if (inputId == 2) {
+                               input2Finished = true;
+                       }
+
+                       if (inputId == 3) {
+                               input3Finished = true;
+                       }
+
+                       if (input1Finished && input2Finished && input3Finished) 
{
+                               output.collect(new 
StreamRecord<>(asserter.getSeenRecords()));
+                       }
+               }
+
+               @Override
+               public List<Input> getInputs() {
+                       return Arrays.asList(
+                               new SingleInput(asserter::processElement),
+                               new SingleInput(asserter::processElement),
+                               new SingleInput(asserter::processElement)
+                       );
+               }
+       }
+
+       private static class AssertingThreeInputOperatorFactory implements 
StreamOperatorFactory<Long> {
+               @Override
+               @SuppressWarnings("unchecked")
+               public <T extends StreamOperator<Long>> T 
createStreamOperator(StreamOperatorParameters<Long> parameters) {
+                       return (T) new AssertingThreeInputOperator(parameters, 
3);
+               }
+
+               @Override
+               public void setChainingStrategy(ChainingStrategy strategy) {
+
+               }
+
+               @Override
+               public ChainingStrategy getChainingStrategy() {
+                       return ChainingStrategy.NEVER;
+               }
+
+               @Override
+               public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+                       return AssertingThreeInputOperator.class;
+               }
+       }
+
+       private static class SingleInput implements Input<Integer> {
+
+               private final Consumer<Integer> recordConsumer;
+
+               private SingleInput(Consumer<Integer> recordConsumer) {
+                       this.recordConsumer = recordConsumer;
+               }
+
+               @Override
+               public void processElement(StreamRecord<Integer> element) {
+                       recordConsumer.accept(element.getValue());
+               }
+
+               @Override
+               public void 
processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {
+
+               }
+
+               @Override
+               public void processLatencyMarker(LatencyMarker latencyMarker) {
+
+               }
+
+               @Override
+               public void setKeyContextElement(StreamRecord<Integer> record) {
+
+               }
+       }
+
+       private static class InputGenerator extends SplittableIterator<Integer> 
{
+
+               private final long numberOfRecords;
+               private long generatedRecords;
+
+               private InputGenerator(long numberOfRecords) {
+                       this.numberOfRecords = numberOfRecords;
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public Iterator<Integer>[] split(int numPartitions) {
+                       long numberOfRecordsPerPartition = numberOfRecords / 
numPartitions;
+                       long remainder = numberOfRecords % numPartitions;
+                       Iterator<Integer>[] iterators = new 
Iterator[numPartitions];
+
+                       for (int i = 0; i < numPartitions - 1; i++) {
+                               iterators[i] = new 
InputGenerator(numberOfRecordsPerPartition);
+                       }
+
+                       iterators[numPartitions - 1] = new 
InputGenerator(numberOfRecordsPerPartition + remainder);
+
+                       return iterators;
+               }
+
+               @Override
+               public int getMaximumNumberOfSplits() {
+                       return (int) Math.min(numberOfRecords, 
Integer.MAX_VALUE);
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return generatedRecords < numberOfRecords;
+               }
+
+               @Override
+               public Integer next() {
+                       if (hasNext()) {
+                               generatedRecords++;
+                               return INDICES.get((int) (generatedRecords % 
INDICES.size()));
+                       }
+
+                       return null;
+               }
+       }
+}

Reply via email to