This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new bb597ea  [hotfix] Fix benchmarks not compiling after FLINK-18934
bb597ea is described below

commit bb597ea2fc723375c25a7be262011ab67e044774
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue May 25 18:39:34 2021 +0200

    [hotfix] Fix benchmarks not compiling after FLINK-18934
---
 .../benchmark/SortingBoundedInputBenchmarks.java   | 650 ++++++++++-----------
 1 file changed, 324 insertions(+), 326 deletions(-)

diff --git 
a/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java 
b/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
index 06628a4..ad955e0 100644
--- 
a/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
+++ 
b/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import 
org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.util.SplittableIterator;
 
 import org.junit.Assert;
@@ -70,332 +71,329 @@ import java.util.stream.IntStream;
 
 import static org.openjdk.jmh.annotations.Scope.Thread;
 
-/**
- * An end to end test for sorted inputs for a keyed operator with bounded 
inputs.
- */
+/** An end to end test for sorted inputs for a keyed operator with bounded 
inputs. */
 public class SortingBoundedInputBenchmarks extends BenchmarkBase {
 
-       private static final int RECORDS_PER_INVOCATION = 1_500_000;
-       private static final List<Integer> INDICES = IntStream.range(0, 
10).boxed().collect(Collectors.toList());
-       static {
-               Collections.shuffle(INDICES);
-       }
-
-       public static void main(String[] args) throws RunnerException {
-               Options options = new OptionsBuilder()
-                               .verbosity(VerboseMode.NORMAL)
-                               .include(".*" + 
SortingBoundedInputBenchmarks.class.getCanonicalName() + ".*")
-                               .build();
-
-               new Runner(options).run();
-       }
-
-       @State(Thread)
-       public static class SortingInputContext extends FlinkEnvironmentContext 
{
-               @Override
-               protected Configuration createConfiguration() {
-                       Configuration configuration = 
super.createConfiguration();
-                       configuration.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
-                       
configuration.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, 0f);
-                       return configuration;
-               }
-       }
-
-       @Benchmark
-       @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
-       public void sortedOneInput(SortingInputContext context) throws 
Exception {
-               StreamExecutionEnvironment env = context.env;
-
-               DataStreamSource<Integer> elements = env.fromParallelCollection(
-                       new InputGenerator(RECORDS_PER_INVOCATION),
-                       BasicTypeInfo.INT_TYPE_INFO
-               );
-
-               SingleOutputStreamOperator<Long> counts = elements
-                       .keyBy(element -> element)
-                       .transform(
-                               "Asserting operator",
-                               BasicTypeInfo.LONG_TYPE_INFO,
-                               new AssertingOperator()
-                       );
-
-               counts.addSink(new DiscardingSink<>());
-               context.execute();
-       }
-
-       @Benchmark
-       @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
-       public void sortedTwoInput(SortingInputContext context) throws 
Exception {
-               StreamExecutionEnvironment env = context.env;
-
-               DataStreamSource<Integer> elements1 = 
env.fromParallelCollection(
-                       new InputGenerator(RECORDS_PER_INVOCATION / 2),
-                       BasicTypeInfo.INT_TYPE_INFO
-               );
-
-               DataStreamSource<Integer> elements2 = 
env.fromParallelCollection(
-                       new InputGenerator(RECORDS_PER_INVOCATION / 2),
-                       BasicTypeInfo.INT_TYPE_INFO
-               );
-               SingleOutputStreamOperator<Long> counts = 
elements1.connect(elements2)
-                       .keyBy(
-                               element -> element,
-                               element -> element
-                       )
-                       .transform(
-                               "Asserting operator",
-                               BasicTypeInfo.LONG_TYPE_INFO,
-                               new AssertingTwoInputOperator()
-                       );
-
-               counts.addSink(new DiscardingSink<>());
-               context.execute();
-       }
-
-       @Benchmark
-       @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
-       public void sortedMultiInput(SortingInputContext context) throws 
Exception {
-               StreamExecutionEnvironment env = context.env;
-
-               KeyedStream<Integer, Object> elements1 = 
env.fromParallelCollection(
-                       new InputGenerator(RECORDS_PER_INVOCATION / 3),
-                       BasicTypeInfo.INT_TYPE_INFO
-               ).keyBy(el -> el);
-
-               KeyedStream<Integer, Object> elements2 = 
env.fromParallelCollection(
-                       new InputGenerator(RECORDS_PER_INVOCATION / 3),
-                       BasicTypeInfo.INT_TYPE_INFO
-               ).keyBy(el -> el);
-
-               KeyedStream<Integer, Object> elements3 = 
env.fromParallelCollection(
-                       new InputGenerator(RECORDS_PER_INVOCATION / 3),
-                       BasicTypeInfo.INT_TYPE_INFO
-               ).keyBy(el -> el);
-
-               KeyedMultipleInputTransformation<Long> assertingTransformation 
= new KeyedMultipleInputTransformation<>(
-                       "Asserting operator",
-                       new AssertingThreeInputOperatorFactory(),
-                       BasicTypeInfo.LONG_TYPE_INFO,
-                       -1,
-                       BasicTypeInfo.INT_TYPE_INFO
-               );
-               assertingTransformation.addInput(elements1.getTransformation(), 
elements1.getKeySelector());
-               assertingTransformation.addInput(elements2.getTransformation(), 
elements2.getKeySelector());
-               assertingTransformation.addInput(elements3.getTransformation(), 
elements3.getKeySelector());
-
-               env.addOperator(assertingTransformation);
-               DataStream<Long> counts = new DataStream<>(env, 
assertingTransformation);
-
-               counts.addSink(new DiscardingSink<>());
-               context.execute();
-       }
-
-       private static final class ProcessedKeysOrderAsserter implements 
Serializable {
-               private final Set<Integer> seenKeys = new HashSet<>();
-               private long seenRecords = 0;
-               private Integer currentKey = null;
-
-               public void processElement(Integer element) {
-                       this.seenRecords++;
-                       if (!Objects.equals(element, currentKey)) {
-                               if (!seenKeys.add(element)) {
-                                       Assert.fail("Received an out of order 
key: " + element);
-                               }
-                               currentKey = element;
-                       }
-               }
-
-               public long getSeenRecords() {
-                       return seenRecords;
-               }
-       }
-
-       private static class AssertingOperator extends 
AbstractStreamOperator<Long>
-                       implements OneInputStreamOperator<Integer, Long>, 
BoundedOneInput {
-               private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
-
-               @Override
-               public void processElement(StreamRecord<Integer> element) {
-                       asserter.processElement(element.getValue());
-               }
-
-               @Override
-               public void endInput() {
-                       output.collect(new 
StreamRecord<>(asserter.getSeenRecords()));
-               }
-
-       }
-
-       private static class AssertingTwoInputOperator extends 
AbstractStreamOperator<Long>
-                       implements TwoInputStreamOperator<Integer, Integer, 
Long>, BoundedMultiInput {
-               private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
-               private boolean input1Finished = false;
-               private boolean input2Finished = false;
-
-               @Override
-               public void processElement1(StreamRecord<Integer> element) {
-                       asserter.processElement(element.getValue());
-               }
-
-               @Override
-               public void processElement2(StreamRecord<Integer> element) {
-                       asserter.processElement(element.getValue());
-               }
-
-               @Override
-               public void endInput(int inputId) {
-                       if (inputId == 1) {
-                               input1Finished = true;
-                       }
-
-                       if (inputId == 2) {
-                               input2Finished = true;
-                       }
-
-                       if (input1Finished && input2Finished) {
-                               output.collect(new 
StreamRecord<>(asserter.getSeenRecords()));
-                       }
-               }
-       }
-
-       private static class AssertingThreeInputOperator extends 
AbstractStreamOperatorV2<Long>
-                       implements MultipleInputStreamOperator<Long>, 
BoundedMultiInput {
-               private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
-               private boolean input1Finished = false;
-               private boolean input2Finished = false;
-               private boolean input3Finished = false;
-
-               public AssertingThreeInputOperator(
-                               StreamOperatorParameters<Long> parameters,
-                               int numberOfInputs) {
-                       super(parameters, 3);
-                       assert numberOfInputs == 3;
-               }
-
-               @Override
-               public void endInput(int inputId) {
-                       if (inputId == 1) {
-                               input1Finished = true;
-                       }
-
-                       if (inputId == 2) {
-                               input2Finished = true;
-                       }
-
-                       if (inputId == 3) {
-                               input3Finished = true;
-                       }
-
-                       if (input1Finished && input2Finished && input3Finished) 
{
-                               output.collect(new 
StreamRecord<>(asserter.getSeenRecords()));
-                       }
-               }
-
-               @Override
-               public List<Input> getInputs() {
-                       return Arrays.asList(
-                               new SingleInput(asserter::processElement),
-                               new SingleInput(asserter::processElement),
-                               new SingleInput(asserter::processElement)
-                       );
-               }
-       }
-
-       private static class AssertingThreeInputOperatorFactory implements 
StreamOperatorFactory<Long> {
-               @Override
-               @SuppressWarnings("unchecked")
-               public <T extends StreamOperator<Long>> T 
createStreamOperator(StreamOperatorParameters<Long> parameters) {
-                       return (T) new AssertingThreeInputOperator(parameters, 
3);
-               }
-
-               @Override
-               public void setChainingStrategy(ChainingStrategy strategy) {
-
-               }
-
-               @Override
-               public ChainingStrategy getChainingStrategy() {
-                       return ChainingStrategy.NEVER;
-               }
-
-               @Override
-               public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
-                       return AssertingThreeInputOperator.class;
-               }
-       }
-
-       private static class SingleInput implements Input<Integer> {
-
-               private final Consumer<Integer> recordConsumer;
-
-               private SingleInput(Consumer<Integer> recordConsumer) {
-                       this.recordConsumer = recordConsumer;
-               }
-
-               @Override
-               public void processElement(StreamRecord<Integer> element) {
-                       recordConsumer.accept(element.getValue());
-               }
-
-               @Override
-               public void 
processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {
-
-               }
-
-               @Override
-               public void processLatencyMarker(LatencyMarker latencyMarker) {
-
-               }
-
-               @Override
-               public void setKeyContextElement(StreamRecord<Integer> record) {
-
-               }
-       }
-
-       private static class InputGenerator extends SplittableIterator<Integer> 
{
-
-               private final long numberOfRecords;
-               private long generatedRecords;
-
-               private InputGenerator(long numberOfRecords) {
-                       this.numberOfRecords = numberOfRecords;
-               }
-
-               @Override
-               @SuppressWarnings("unchecked")
-               public Iterator<Integer>[] split(int numPartitions) {
-                       long numberOfRecordsPerPartition = numberOfRecords / 
numPartitions;
-                       long remainder = numberOfRecords % numPartitions;
-                       Iterator<Integer>[] iterators = new 
Iterator[numPartitions];
-
-                       for (int i = 0; i < numPartitions - 1; i++) {
-                               iterators[i] = new 
InputGenerator(numberOfRecordsPerPartition);
-                       }
-
-                       iterators[numPartitions - 1] = new 
InputGenerator(numberOfRecordsPerPartition + remainder);
-
-                       return iterators;
-               }
-
-               @Override
-               public int getMaximumNumberOfSplits() {
-                       return (int) Math.min(numberOfRecords, 
Integer.MAX_VALUE);
-               }
-
-               @Override
-               public boolean hasNext() {
-                       return generatedRecords < numberOfRecords;
-               }
-
-               @Override
-               public Integer next() {
-                       if (hasNext()) {
-                               generatedRecords++;
-                               return INDICES.get((int) (generatedRecords % 
INDICES.size()));
-                       }
-
-                       return null;
-               }
-       }
+    private static final int RECORDS_PER_INVOCATION = 1_500_000;
+    private static final List<Integer> INDICES =
+            IntStream.range(0, 10).boxed().collect(Collectors.toList());
+
+    static {
+        Collections.shuffle(INDICES);
+    }
+
+    public static void main(String[] args) throws RunnerException {
+        Options options =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        .include(
+                                ".*"
+                                        + 
SortingBoundedInputBenchmarks.class.getCanonicalName()
+                                        + ".*")
+                        .build();
+
+        new Runner(options).run();
+    }
+
+    @State(Thread)
+    public static class SortingInputContext extends FlinkEnvironmentContext {
+        @Override
+        protected Configuration createConfiguration() {
+            Configuration configuration = super.createConfiguration();
+            configuration.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
+            configuration.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, 0f);
+            return configuration;
+        }
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+    public void sortedOneInput(SortingInputContext context) throws Exception {
+        StreamExecutionEnvironment env = context.env;
+
+        DataStreamSource<Integer> elements =
+                env.fromParallelCollection(
+                        new InputGenerator(RECORDS_PER_INVOCATION), 
BasicTypeInfo.INT_TYPE_INFO);
+
+        SingleOutputStreamOperator<Long> counts =
+                elements.keyBy(element -> element)
+                        .transform(
+                                "Asserting operator",
+                                BasicTypeInfo.LONG_TYPE_INFO,
+                                new AssertingOperator());
+
+        counts.addSink(new DiscardingSink<>());
+        context.execute();
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+    public void sortedTwoInput(SortingInputContext context) throws Exception {
+        StreamExecutionEnvironment env = context.env;
+
+        DataStreamSource<Integer> elements1 =
+                env.fromParallelCollection(
+                        new InputGenerator(RECORDS_PER_INVOCATION / 2),
+                        BasicTypeInfo.INT_TYPE_INFO);
+
+        DataStreamSource<Integer> elements2 =
+                env.fromParallelCollection(
+                        new InputGenerator(RECORDS_PER_INVOCATION / 2),
+                        BasicTypeInfo.INT_TYPE_INFO);
+        SingleOutputStreamOperator<Long> counts =
+                elements1
+                        .connect(elements2)
+                        .keyBy(element -> element, element -> element)
+                        .transform(
+                                "Asserting operator",
+                                BasicTypeInfo.LONG_TYPE_INFO,
+                                new AssertingTwoInputOperator());
+
+        counts.addSink(new DiscardingSink<>());
+        context.execute();
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+    public void sortedMultiInput(SortingInputContext context) throws Exception 
{
+        StreamExecutionEnvironment env = context.env;
+
+        KeyedStream<Integer, Object> elements1 =
+                env.fromParallelCollection(
+                                new InputGenerator(RECORDS_PER_INVOCATION / 3),
+                                BasicTypeInfo.INT_TYPE_INFO)
+                        .keyBy(el -> el);
+
+        KeyedStream<Integer, Object> elements2 =
+                env.fromParallelCollection(
+                                new InputGenerator(RECORDS_PER_INVOCATION / 3),
+                                BasicTypeInfo.INT_TYPE_INFO)
+                        .keyBy(el -> el);
+
+        KeyedStream<Integer, Object> elements3 =
+                env.fromParallelCollection(
+                                new InputGenerator(RECORDS_PER_INVOCATION / 3),
+                                BasicTypeInfo.INT_TYPE_INFO)
+                        .keyBy(el -> el);
+
+        KeyedMultipleInputTransformation<Long> assertingTransformation =
+                new KeyedMultipleInputTransformation<>(
+                        "Asserting operator",
+                        new AssertingThreeInputOperatorFactory(),
+                        BasicTypeInfo.LONG_TYPE_INFO,
+                        -1,
+                        BasicTypeInfo.INT_TYPE_INFO);
+        assertingTransformation.addInput(elements1.getTransformation(), 
elements1.getKeySelector());
+        assertingTransformation.addInput(elements2.getTransformation(), 
elements2.getKeySelector());
+        assertingTransformation.addInput(elements3.getTransformation(), 
elements3.getKeySelector());
+
+        env.addOperator(assertingTransformation);
+        DataStream<Long> counts = new DataStream<>(env, 
assertingTransformation);
+
+        counts.addSink(new DiscardingSink<>());
+        context.execute();
+    }
+
+    private static final class ProcessedKeysOrderAsserter implements 
Serializable {
+        private final Set<Integer> seenKeys = new HashSet<>();
+        private long seenRecords = 0;
+        private Integer currentKey = null;
+
+        public void processElement(Integer element) {
+            this.seenRecords++;
+            if (!Objects.equals(element, currentKey)) {
+                if (!seenKeys.add(element)) {
+                    Assert.fail("Received an out of order key: " + element);
+                }
+                currentKey = element;
+            }
+        }
+
+        public long getSeenRecords() {
+            return seenRecords;
+        }
+    }
+
+    private static class AssertingOperator extends AbstractStreamOperator<Long>
+            implements OneInputStreamOperator<Integer, Long>, BoundedOneInput {
+        private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
+
+        @Override
+        public void processElement(StreamRecord<Integer> element) {
+            asserter.processElement(element.getValue());
+        }
+
+        @Override
+        public void endInput() {
+            output.collect(new StreamRecord<>(asserter.getSeenRecords()));
+        }
+    }
+
+    private static class AssertingTwoInputOperator extends 
AbstractStreamOperator<Long>
+            implements TwoInputStreamOperator<Integer, Integer, Long>, 
BoundedMultiInput {
+        private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
+        private boolean input1Finished = false;
+        private boolean input2Finished = false;
+
+        @Override
+        public void processElement1(StreamRecord<Integer> element) {
+            asserter.processElement(element.getValue());
+        }
+
+        @Override
+        public void processElement2(StreamRecord<Integer> element) {
+            asserter.processElement(element.getValue());
+        }
+
+        @Override
+        public void endInput(int inputId) {
+            if (inputId == 1) {
+                input1Finished = true;
+            }
+
+            if (inputId == 2) {
+                input2Finished = true;
+            }
+
+            if (input1Finished && input2Finished) {
+                output.collect(new StreamRecord<>(asserter.getSeenRecords()));
+            }
+        }
+    }
+
+    private static class AssertingThreeInputOperator extends 
AbstractStreamOperatorV2<Long>
+            implements MultipleInputStreamOperator<Long>, BoundedMultiInput {
+        private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
+        private boolean input1Finished = false;
+        private boolean input2Finished = false;
+        private boolean input3Finished = false;
+
+        public AssertingThreeInputOperator(
+                StreamOperatorParameters<Long> parameters, int numberOfInputs) 
{
+            super(parameters, 3);
+            assert numberOfInputs == 3;
+        }
+
+        @Override
+        public void endInput(int inputId) {
+            if (inputId == 1) {
+                input1Finished = true;
+            }
+
+            if (inputId == 2) {
+                input2Finished = true;
+            }
+
+            if (inputId == 3) {
+                input3Finished = true;
+            }
+
+            if (input1Finished && input2Finished && input3Finished) {
+                output.collect(new StreamRecord<>(asserter.getSeenRecords()));
+            }
+        }
+
+        @Override
+        public List<Input> getInputs() {
+            return Arrays.asList(
+                    new SingleInput(asserter::processElement),
+                    new SingleInput(asserter::processElement),
+                    new SingleInput(asserter::processElement));
+        }
+    }
+
+    private static class AssertingThreeInputOperatorFactory implements 
StreamOperatorFactory<Long> {
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T extends StreamOperator<Long>> T createStreamOperator(
+                StreamOperatorParameters<Long> parameters) {
+            return (T) new AssertingThreeInputOperator(parameters, 3);
+        }
+
+        @Override
+        public void setChainingStrategy(ChainingStrategy strategy) {}
+
+        @Override
+        public ChainingStrategy getChainingStrategy() {
+            return ChainingStrategy.NEVER;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return AssertingThreeInputOperator.class;
+        }
+    }
+
+    private static class SingleInput implements Input<Integer> {
+
+        private final Consumer<Integer> recordConsumer;
+
+        private SingleInput(Consumer<Integer> recordConsumer) {
+            this.recordConsumer = recordConsumer;
+        }
+
+        @Override
+        public void processElement(StreamRecord<Integer> element) {
+            recordConsumer.accept(element.getValue());
+        }
+
+        @Override
+        public void 
processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {}
+
+        @Override
+        public void processLatencyMarker(LatencyMarker latencyMarker) {}
+
+        @Override
+        public void setKeyContextElement(StreamRecord<Integer> record) {}
+
+        @Override
+        public void emitStreamStatus(StreamStatus streamStatus) throws 
Exception {}
+    }
+
+    private static class InputGenerator extends SplittableIterator<Integer> {
+
+        private final long numberOfRecords;
+        private long generatedRecords;
+
+        private InputGenerator(long numberOfRecords) {
+            this.numberOfRecords = numberOfRecords;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public Iterator<Integer>[] split(int numPartitions) {
+            long numberOfRecordsPerPartition = numberOfRecords / numPartitions;
+            long remainder = numberOfRecords % numPartitions;
+            Iterator<Integer>[] iterators = new Iterator[numPartitions];
+
+            for (int i = 0; i < numPartitions - 1; i++) {
+                iterators[i] = new InputGenerator(numberOfRecordsPerPartition);
+            }
+
+            iterators[numPartitions - 1] =
+                    new InputGenerator(numberOfRecordsPerPartition + 
remainder);
+
+            return iterators;
+        }
+
+        @Override
+        public int getMaximumNumberOfSplits() {
+            return (int) Math.min(numberOfRecords, Integer.MAX_VALUE);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return generatedRecords < numberOfRecords;
+        }
+
+        @Override
+        public Integer next() {
+            if (hasNext()) {
+                generatedRecords++;
+                return INDICES.get((int) (generatedRecords % INDICES.size()));
+            }
+
+            return null;
+        }
+    }
 }

Reply via email to