This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch test1.14 in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
commit de6d62f4e73ca79a39da647ee21de244fc4e154b Author: Yun Gao <[email protected]> AuthorDate: Wed Sep 22 21:08:21 2021 +0800 Fix compile problem --- pom.xml | 2 +- .../benchmark/SortingBoundedInputBenchmarks.java | 820 +++++++++++---------- 2 files changed, 422 insertions(+), 400 deletions(-) diff --git a/pom.xml b/pom.xml index f92f166..36cf2eb 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ under the License. <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <flink.version>1.15-SNAPSHOT</flink.version> + <flink.version>1.14-SNAPSHOT</flink.version> <flink.shaded.version>14.0</flink.shaded.version> <netty.tcnative.version>2.0.39.Final</netty.tcnative.version> <java.version>1.8</java.version> diff --git a/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java b/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java index 3a0af48..c12a5e8 100644 --- a/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java +++ b/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java @@ -1,399 +1,421 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.benchmark; - -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.configuration.AlgorithmOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExecutionOptions; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.KeyedStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; -import org.apache.flink.streaming.api.operators.BoundedMultiInput; -import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.Input; -import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.operators.StreamOperatorFactory; -import org.apache.flink.streaming.api.operators.StreamOperatorParameters; -import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation; -import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -import org.apache.flink.util.SplittableIterator; - -import org.junit.Assert; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.OperationsPerInvocation; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.RunnerException; -import org.openjdk.jmh.runner.options.Options; -import org.openjdk.jmh.runner.options.OptionsBuilder; -import org.openjdk.jmh.runner.options.VerboseMode; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.openjdk.jmh.annotations.Scope.Thread; - -/** An end to end test for sorted inputs for a keyed operator with bounded inputs. */ -public class SortingBoundedInputBenchmarks extends BenchmarkBase { - - private static final int RECORDS_PER_INVOCATION = 1_500_000; - private static final List<Integer> INDICES = - IntStream.range(0, 10).boxed().collect(Collectors.toList()); - - static { - Collections.shuffle(INDICES); - } - - public static void main(String[] args) throws RunnerException { - Options options = - new OptionsBuilder() - .verbosity(VerboseMode.NORMAL) - .include( - ".*" - + SortingBoundedInputBenchmarks.class.getCanonicalName() - + ".*") - .build(); - - new Runner(options).run(); - } - - @State(Thread) - public static class SortingInputContext extends FlinkEnvironmentContext { - @Override - protected Configuration createConfiguration() { - Configuration configuration = super.createConfiguration(); - configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); - configuration.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, 0f); - return configuration; - } - } - - @Benchmark - @OperationsPerInvocation(value = RECORDS_PER_INVOCATION) - public void sortedOneInput(SortingInputContext context) throws Exception { - StreamExecutionEnvironment env = context.env; - - DataStreamSource<Integer> elements = - env.fromParallelCollection( - new InputGenerator(RECORDS_PER_INVOCATION), BasicTypeInfo.INT_TYPE_INFO); - - SingleOutputStreamOperator<Long> counts = - elements.keyBy(element -> element) - .transform( - "Asserting operator", - BasicTypeInfo.LONG_TYPE_INFO, - new AssertingOperator()); - - counts.addSink(new DiscardingSink<>()); - context.execute(); - } - - @Benchmark - @OperationsPerInvocation(value = RECORDS_PER_INVOCATION) - public void sortedTwoInput(SortingInputContext context) throws Exception { - StreamExecutionEnvironment env = context.env; - - DataStreamSource<Integer> elements1 = - env.fromParallelCollection( - new InputGenerator(RECORDS_PER_INVOCATION / 2), - BasicTypeInfo.INT_TYPE_INFO); - - DataStreamSource<Integer> elements2 = - env.fromParallelCollection( - new InputGenerator(RECORDS_PER_INVOCATION / 2), - BasicTypeInfo.INT_TYPE_INFO); - SingleOutputStreamOperator<Long> counts = - elements1 - .connect(elements2) - .keyBy(element -> element, element -> element) - .transform( - "Asserting operator", - BasicTypeInfo.LONG_TYPE_INFO, - new AssertingTwoInputOperator()); - - counts.addSink(new DiscardingSink<>()); - context.execute(); - } - - @Benchmark - @OperationsPerInvocation(value = RECORDS_PER_INVOCATION) - public void sortedMultiInput(SortingInputContext context) throws Exception { - StreamExecutionEnvironment env = context.env; - - KeyedStream<Integer, Object> elements1 = - env.fromParallelCollection( - new InputGenerator(RECORDS_PER_INVOCATION / 3), - BasicTypeInfo.INT_TYPE_INFO) - .keyBy(el -> el); - - KeyedStream<Integer, Object> elements2 = - env.fromParallelCollection( - new InputGenerator(RECORDS_PER_INVOCATION / 3), - BasicTypeInfo.INT_TYPE_INFO) - .keyBy(el -> el); - - KeyedStream<Integer, Object> elements3 = - env.fromParallelCollection( - new InputGenerator(RECORDS_PER_INVOCATION / 3), - BasicTypeInfo.INT_TYPE_INFO) - .keyBy(el -> el); - - KeyedMultipleInputTransformation<Long> assertingTransformation = - new KeyedMultipleInputTransformation<>( - "Asserting operator", - new AssertingThreeInputOperatorFactory(), - BasicTypeInfo.LONG_TYPE_INFO, - -1, - BasicTypeInfo.INT_TYPE_INFO); - assertingTransformation.addInput(elements1.getTransformation(), elements1.getKeySelector()); - assertingTransformation.addInput(elements2.getTransformation(), elements2.getKeySelector()); - assertingTransformation.addInput(elements3.getTransformation(), elements3.getKeySelector()); - - env.addOperator(assertingTransformation); - DataStream<Long> counts = new DataStream<>(env, assertingTransformation); - - counts.addSink(new DiscardingSink<>()); - context.execute(); - } - - private static final class ProcessedKeysOrderAsserter implements Serializable { - private final Set<Integer> seenKeys = new HashSet<>(); - private long seenRecords = 0; - private Integer currentKey = null; - - public void processElement(Integer element) { - this.seenRecords++; - if (!Objects.equals(element, currentKey)) { - if (!seenKeys.add(element)) { - Assert.fail("Received an out of order key: " + element); - } - currentKey = element; - } - } - - public long getSeenRecords() { - return seenRecords; - } - } - - private static class AssertingOperator extends AbstractStreamOperator<Long> - implements OneInputStreamOperator<Integer, Long>, BoundedOneInput { - private final ProcessedKeysOrderAsserter asserter = new ProcessedKeysOrderAsserter(); - - @Override - public void processElement(StreamRecord<Integer> element) { - asserter.processElement(element.getValue()); - } - - @Override - public void endInput() { - output.collect(new StreamRecord<>(asserter.getSeenRecords())); - } - } - - private static class AssertingTwoInputOperator extends AbstractStreamOperator<Long> - implements TwoInputStreamOperator<Integer, Integer, Long>, BoundedMultiInput { - private final ProcessedKeysOrderAsserter asserter = new ProcessedKeysOrderAsserter(); - private boolean input1Finished = false; - private boolean input2Finished = false; - - @Override - public void processElement1(StreamRecord<Integer> element) { - asserter.processElement(element.getValue()); - } - - @Override - public void processElement2(StreamRecord<Integer> element) { - asserter.processElement(element.getValue()); - } - - @Override - public void endInput(int inputId) { - if (inputId == 1) { - input1Finished = true; - } - - if (inputId == 2) { - input2Finished = true; - } - - if (input1Finished && input2Finished) { - output.collect(new StreamRecord<>(asserter.getSeenRecords())); - } - } - } - - private static class AssertingThreeInputOperator extends AbstractStreamOperatorV2<Long> - implements MultipleInputStreamOperator<Long>, BoundedMultiInput { - private final ProcessedKeysOrderAsserter asserter = new ProcessedKeysOrderAsserter(); - private boolean input1Finished = false; - private boolean input2Finished = false; - private boolean input3Finished = false; - - public AssertingThreeInputOperator( - StreamOperatorParameters<Long> parameters, int numberOfInputs) { - super(parameters, 3); - assert numberOfInputs == 3; - } - - @Override - public void endInput(int inputId) { - if (inputId == 1) { - input1Finished = true; - } - - if (inputId == 2) { - input2Finished = true; - } - - if (inputId == 3) { - input3Finished = true; - } - - if (input1Finished && input2Finished && input3Finished) { - output.collect(new StreamRecord<>(asserter.getSeenRecords())); - } - } - - @Override - public List<Input> getInputs() { - return Arrays.asList( - new SingleInput(asserter::processElement), - new SingleInput(asserter::processElement), - new SingleInput(asserter::processElement)); - } - } - - private static class AssertingThreeInputOperatorFactory implements StreamOperatorFactory<Long> { - @Override - @SuppressWarnings("unchecked") - public <T extends StreamOperator<Long>> T createStreamOperator( - StreamOperatorParameters<Long> parameters) { - return (T) new AssertingThreeInputOperator(parameters, 3); - } - - @Override - public void setChainingStrategy(ChainingStrategy strategy) {} - - @Override - public ChainingStrategy getChainingStrategy() { - return ChainingStrategy.NEVER; - } - - @Override - public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) { - return AssertingThreeInputOperator.class; - } - } - - private static class SingleInput implements Input<Integer> { - - private final Consumer<Integer> recordConsumer; - - private SingleInput(Consumer<Integer> recordConsumer) { - this.recordConsumer = recordConsumer; - } - - @Override - public void processElement(StreamRecord<Integer> element) { - recordConsumer.accept(element.getValue()); - } - - @Override - public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {} - - @Override - public void processLatencyMarker(LatencyMarker latencyMarker) {} - - @Override - public void setKeyContextElement(StreamRecord<Integer> record) {} - - @Override - public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {} - } - - private static class InputGenerator extends SplittableIterator<Integer> { - - private final long numberOfRecords; - private long generatedRecords; - - private InputGenerator(long numberOfRecords) { - this.numberOfRecords = numberOfRecords; - } - - @Override - @SuppressWarnings("unchecked") - public Iterator<Integer>[] split(int numPartitions) { - long numberOfRecordsPerPartition = numberOfRecords / numPartitions; - long remainder = numberOfRecords % numPartitions; - Iterator<Integer>[] iterators = new Iterator[numPartitions]; - - for (int i = 0; i < numPartitions - 1; i++) { - iterators[i] = new InputGenerator(numberOfRecordsPerPartition); - } - - iterators[numPartitions - 1] = - new InputGenerator(numberOfRecordsPerPartition + remainder); - - return iterators; - } - - @Override - public int getMaximumNumberOfSplits() { - return (int) Math.min(numberOfRecords, Integer.MAX_VALUE); - } - - @Override - public boolean hasNext() { - return generatedRecords < numberOfRecords; - } - - @Override - public Integer next() { - if (hasNext()) { - generatedRecords++; - return INDICES.get((int) (generatedRecords % INDICES.size())); - } - - return null; - } - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.flink.benchmark; +// +//import org.apache.flink.api.common.RuntimeExecutionMode; +//import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +//import org.apache.flink.configuration.AlgorithmOptions; +//import org.apache.flink.configuration.Configuration; +//import org.apache.flink.configuration.ExecutionOptions; +//import org.apache.flink.streaming.api.datastream.DataStream; +//import org.apache.flink.streaming.api.datastream.DataStreamSource; +//import org.apache.flink.streaming.api.datastream.KeyedStream; +//import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +//import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +//import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +//import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; +//import org.apache.flink.streaming.api.operators.BoundedMultiInput; +//import org.apache.flink.streaming.api.operators.BoundedOneInput; +//import org.apache.flink.streaming.api.operators.ChainingStrategy; +//import org.apache.flink.streaming.api.operators.Input; +//import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +//import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +//import org.apache.flink.streaming.api.operators.StreamOperator; +//import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +//import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +//import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +//import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation; +//import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +//import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +//import org.apache.flink.util.SplittableIterator; +// +//import org.junit.Assert; +//import org.openjdk.jmh.annotations.Benchmark; +//import org.openjdk.jmh.annotations.OperationsPerInvocation; +//import org.openjdk.jmh.annotations.State; +//import org.openjdk.jmh.runner.Runner; +//import org.openjdk.jmh.runner.RunnerException; +//import org.openjdk.jmh.runner.options.Options; +//import org.openjdk.jmh.runner.options.OptionsBuilder; +//import org.openjdk.jmh.runner.options.VerboseMode; +// +//import java.io.Serializable; +//import java.util.Arrays; +//import java.util.Collections; +//import java.util.HashSet; +//import java.util.Iterator; +//import java.util.List; +//import java.util.Objects; +//import java.util.Set; +//import java.util.function.Consumer; +//import java.util.stream.Collectors; +//import java.util.stream.IntStream; +// +//import static org.openjdk.jmh.annotations.Scope.Thread; +// +///** +// * An end to end test for sorted inputs for a keyed operator with bounded inputs. +// */ +//public class SortingBoundedInputBenchmarks extends BenchmarkBase { +// +// private static final int RECORDS_PER_INVOCATION = 1_500_000; +// private static final List<Integer> INDICES = +// IntStream.range(0, 10).boxed().collect(Collectors.toList()); +// +// static { +// Collections.shuffle(INDICES); +// } +// +// public static void main(String[] args) throws RunnerException { +// Options options = +// new OptionsBuilder() +// .verbosity(VerboseMode.NORMAL) +// .include( +// ".*" +// + SortingBoundedInputBenchmarks.class.getCanonicalName() +// + ".*") +// .build(); +// +// new Runner(options).run(); +// } +// +// @State(Thread) +// public static class SortingInputContext extends FlinkEnvironmentContext { +// @Override +// protected Configuration createConfiguration() { +// Configuration configuration = super.createConfiguration(); +// configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); +// configuration.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, 0f); +// return configuration; +// } +// } +// +// @Benchmark +// @OperationsPerInvocation(value = RECORDS_PER_INVOCATION) +// public void sortedOneInput(SortingInputContext context) throws Exception { +// StreamExecutionEnvironment env = context.env; +// +// DataStreamSource<Integer> elements = +// env.fromParallelCollection( +// new InputGenerator(RECORDS_PER_INVOCATION), BasicTypeInfo.INT_TYPE_INFO); +// +// SingleOutputStreamOperator<Long> counts = +// elements.keyBy(element -> element) +// .transform( +// "Asserting operator", +// BasicTypeInfo.LONG_TYPE_INFO, +// new AssertingOperator()); +// +// counts.addSink(new DiscardingSink<>()); +// context.execute(); +// } +// +// @Benchmark +// @OperationsPerInvocation(value = RECORDS_PER_INVOCATION) +// public void sortedTwoInput(SortingInputContext context) throws Exception { +// StreamExecutionEnvironment env = context.env; +// +// DataStreamSource<Integer> elements1 = +// env.fromParallelCollection( +// new InputGenerator(RECORDS_PER_INVOCATION / 2), +// BasicTypeInfo.INT_TYPE_INFO); +// +// DataStreamSource<Integer> elements2 = +// env.fromParallelCollection( +// new InputGenerator(RECORDS_PER_INVOCATION / 2), +// BasicTypeInfo.INT_TYPE_INFO); +// SingleOutputStreamOperator<Long> counts = +// elements1 +// .connect(elements2) +// .keyBy(element -> element, element -> element) +// .transform( +// "Asserting operator", +// BasicTypeInfo.LONG_TYPE_INFO, +// new AssertingTwoInputOperator()); +// +// counts.addSink(new DiscardingSink<>()); +// context.execute(); +// } +// +// @Benchmark +// @OperationsPerInvocation(value = RECORDS_PER_INVOCATION) +// public void sortedMultiInput(SortingInputContext context) throws Exception { +// StreamExecutionEnvironment env = context.env; +// +// KeyedStream<Integer, Object> elements1 = +// env.fromParallelCollection( +// new InputGenerator(RECORDS_PER_INVOCATION / 3), +// BasicTypeInfo.INT_TYPE_INFO) +// .keyBy(el -> el); +// +// KeyedStream<Integer, Object> elements2 = +// env.fromParallelCollection( +// new InputGenerator(RECORDS_PER_INVOCATION / 3), +// BasicTypeInfo.INT_TYPE_INFO) +// .keyBy(el -> el); +// +// KeyedStream<Integer, Object> elements3 = +// env.fromParallelCollection( +// new InputGenerator(RECORDS_PER_INVOCATION / 3), +// BasicTypeInfo.INT_TYPE_INFO) +// .keyBy(el -> el); +// +// KeyedMultipleInputTransformation<Long> assertingTransformation = +// new KeyedMultipleInputTransformation<>( +// "Asserting operator", +// new AssertingThreeInputOperatorFactory(), +// BasicTypeInfo.LONG_TYPE_INFO, +// -1, +// BasicTypeInfo.INT_TYPE_INFO); +// assertingTransformation.addInput(elements1.getTransformation(), elements1.getKeySelector()); +// assertingTransformation.addInput(elements2.getTransformation(), elements2.getKeySelector()); +// assertingTransformation.addInput(elements3.getTransformation(), elements3.getKeySelector()); +// +// env.addOperator(assertingTransformation); +// DataStream<Long> counts = new DataStream<>(env, assertingTransformation); +// +// counts.addSink(new DiscardingSink<>()); +// context.execute(); +// } +// +// private static final class ProcessedKeysOrderAsserter implements Serializable { +// private final Set<Integer> seenKeys = new HashSet<>(); +// private long seenRecords = 0; +// private Integer currentKey = null; +// +// public void processElement(Integer element) { +// this.seenRecords++; +// if (!Objects.equals(element, currentKey)) { +// if (!seenKeys.add(element)) { +// Assert.fail("Received an out of order key: " + element); +// } +// currentKey = element; +// } +// } +// +// public long getSeenRecords() { +// return seenRecords; +// } +// } +// +// private static class AssertingOperator extends AbstractStreamOperator<Long> +// implements OneInputStreamOperator<Integer, Long>, BoundedOneInput { +// private final ProcessedKeysOrderAsserter asserter = new ProcessedKeysOrderAsserter(); +// +// @Override +// public void processElement(StreamRecord<Integer> element) { +// asserter.processElement(element.getValue()); +// } +// +// @Override +// public void processStreamStatus(org.apache.flink.streaming.runtime.streamstatus.StreamStatus streamStatus) throws Exception { +// +// } +// +// @Override +// public void endInput() { +// output.collect(new StreamRecord<>(asserter.getSeenRecords())); +// } +// } +// +// private static class AssertingTwoInputOperator extends AbstractStreamOperator<Long> +// implements TwoInputStreamOperator<Integer, Integer, Long>, BoundedMultiInput { +// private final ProcessedKeysOrderAsserter asserter = new ProcessedKeysOrderAsserter(); +// private boolean input1Finished = false; +// private boolean input2Finished = false; +// +// @Override +// public void processElement1(StreamRecord<Integer> element) { +// asserter.processElement(element.getValue()); +// } +// +// @Override +// public void processElement2(StreamRecord<Integer> element) { +// asserter.processElement(element.getValue()); +// } +// +// @Override +// public void processStreamStatus1(org.apache.flink.streaming.runtime.streamstatus.StreamStatus streamStatus) throws Exception { +// +// } +// +// @Override +// public void processStreamStatus2(org.apache.flink.streaming.runtime.streamstatus.StreamStatus streamStatus) throws Exception { +// +// } +// +// @Override +// public void endInput(int inputId) { +// if (inputId == 1) { +// input1Finished = true; +// } +// +// if (inputId == 2) { +// input2Finished = true; +// } +// +// if (input1Finished && input2Finished) { +// output.collect(new StreamRecord<>(asserter.getSeenRecords())); +// } +// } +// } +// +// private static class AssertingThreeInputOperator extends AbstractStreamOperatorV2<Long> +// implements MultipleInputStreamOperator<Long>, BoundedMultiInput { +// private final ProcessedKeysOrderAsserter asserter = new ProcessedKeysOrderAsserter(); +// private boolean input1Finished = false; +// private boolean input2Finished = false; +// private boolean input3Finished = false; +// +// public AssertingThreeInputOperator( +// StreamOperatorParameters<Long> parameters, int numberOfInputs) { +// super(parameters, 3); +// assert numberOfInputs == 3; +// } +// +// @Override +// public void endInput(int inputId) { +// if (inputId == 1) { +// input1Finished = true; +// } +// +// if (inputId == 2) { +// input2Finished = true; +// } +// +// if (inputId == 3) { +// input3Finished = true; +// } +// +// if (input1Finished && input2Finished && input3Finished) { +// output.collect(new StreamRecord<>(asserter.getSeenRecords())); +// } +// } +// +// @Override +// public List<Input> getInputs() { +// return Arrays.asList( +// new SingleInput(asserter::processElement), +// new SingleInput(asserter::processElement), +// new SingleInput(asserter::processElement)); +// } +// } +// +// private static class AssertingThreeInputOperatorFactory implements StreamOperatorFactory<Long> { +// @Override +// @SuppressWarnings("unchecked") +// public <T extends StreamOperator<Long>> T createStreamOperator( +// StreamOperatorParameters<Long> parameters) { +// return (T) new AssertingThreeInputOperator(parameters, 3); +// } +// +// @Override +// public void setChainingStrategy(ChainingStrategy strategy) { +// } +// +// @Override +// public ChainingStrategy getChainingStrategy() { +// return ChainingStrategy.NEVER; +// } +// +// @Override +// public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) { +// return AssertingThreeInputOperator.class; +// } +// } +// +// private static class SingleInput implements Input<Integer> { +// +// private final Consumer<Integer> recordConsumer; +// +// private SingleInput(Consumer<Integer> recordConsumer) { +// this.recordConsumer = recordConsumer; +// } +// +// @Override +// public void processElement(StreamRecord<Integer> element) { +// recordConsumer.accept(element.getValue()); +// } +// +// @Override +// public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) { +// } +// +// @Override +// public void processStreamStatus(org.apache.flink.streaming.runtime.streamstatus.StreamStatus streamStatus) throws Exception { +// +// } +// +// @Override +// public void processLatencyMarker(LatencyMarker latencyMarker) { +// } +// +// @Override +// public void setKeyContextElement(StreamRecord<Integer> record) { +// } +// } +// +// private static class InputGenerator extends SplittableIterator<Integer> { +// +// private final long numberOfRecords; +// private long generatedRecords; +// +// private InputGenerator(long numberOfRecords) { +// this.numberOfRecords = numberOfRecords; +// } +// +// @Override +// @SuppressWarnings("unchecked") +// public Iterator<Integer>[] split(int numPartitions) { +// long numberOfRecordsPerPartition = numberOfRecords / numPartitions; +// long remainder = numberOfRecords % numPartitions; +// Iterator<Integer>[] iterators = new Iterator[numPartitions]; +// +// for (int i = 0; i < numPartitions - 1; i++) { +// iterators[i] = new InputGenerator(numberOfRecordsPerPartition); +// } +// +// iterators[numPartitions - 1] = +// new InputGenerator(numberOfRecordsPerPartition + remainder); +// +// return iterators; +// } +// +// @Override +// public int getMaximumNumberOfSplits() { +// return (int) Math.min(numberOfRecords, Integer.MAX_VALUE); +// } +// +// @Override +// public boolean hasNext() { +// return generatedRecords < numberOfRecords; +// } +// +// @Override +// public Integer next() { +// if (hasNext()) { +// generatedRecords++; +// return INDICES.get((int) (generatedRecords % INDICES.size())); +// } +// +// return null; +// } +// } +//}
