This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch test1.14
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit de6d62f4e73ca79a39da647ee21de244fc4e154b
Author: Yun Gao <[email protected]>
AuthorDate: Wed Sep 22 21:08:21 2021 +0800

    Fix compile problem
---
 pom.xml                                            |   2 +-
 .../benchmark/SortingBoundedInputBenchmarks.java   | 820 +++++++++++----------
 2 files changed, 422 insertions(+), 400 deletions(-)

diff --git a/pom.xml b/pom.xml
index f92f166..36cf2eb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@ under the License.
 
        <properties>
                
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-               <flink.version>1.15-SNAPSHOT</flink.version>
+               <flink.version>1.14-SNAPSHOT</flink.version>
                <flink.shaded.version>14.0</flink.shaded.version>
                <netty.tcnative.version>2.0.39.Final</netty.tcnative.version>
                <java.version>1.8</java.version>
diff --git 
a/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java 
b/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
index 3a0af48..c12a5e8 100644
--- 
a/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
+++ 
b/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
@@ -1,399 +1,421 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.benchmark;
-
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.configuration.AlgorithmOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
-import org.apache.flink.streaming.api.operators.BoundedMultiInput;
-import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.Input;
-import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import 
org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
-import org.apache.flink.util.SplittableIterator;
-
-import org.junit.Assert;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.OperationsPerInvocation;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.runner.Runner;
-import org.openjdk.jmh.runner.RunnerException;
-import org.openjdk.jmh.runner.options.Options;
-import org.openjdk.jmh.runner.options.OptionsBuilder;
-import org.openjdk.jmh.runner.options.VerboseMode;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.openjdk.jmh.annotations.Scope.Thread;
-
-/** An end to end test for sorted inputs for a keyed operator with bounded 
inputs. */
-public class SortingBoundedInputBenchmarks extends BenchmarkBase {
-
-    private static final int RECORDS_PER_INVOCATION = 1_500_000;
-    private static final List<Integer> INDICES =
-            IntStream.range(0, 10).boxed().collect(Collectors.toList());
-
-    static {
-        Collections.shuffle(INDICES);
-    }
-
-    public static void main(String[] args) throws RunnerException {
-        Options options =
-                new OptionsBuilder()
-                        .verbosity(VerboseMode.NORMAL)
-                        .include(
-                                ".*"
-                                        + 
SortingBoundedInputBenchmarks.class.getCanonicalName()
-                                        + ".*")
-                        .build();
-
-        new Runner(options).run();
-    }
-
-    @State(Thread)
-    public static class SortingInputContext extends FlinkEnvironmentContext {
-        @Override
-        protected Configuration createConfiguration() {
-            Configuration configuration = super.createConfiguration();
-            configuration.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
-            configuration.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, 0f);
-            return configuration;
-        }
-    }
-
-    @Benchmark
-    @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
-    public void sortedOneInput(SortingInputContext context) throws Exception {
-        StreamExecutionEnvironment env = context.env;
-
-        DataStreamSource<Integer> elements =
-                env.fromParallelCollection(
-                        new InputGenerator(RECORDS_PER_INVOCATION), 
BasicTypeInfo.INT_TYPE_INFO);
-
-        SingleOutputStreamOperator<Long> counts =
-                elements.keyBy(element -> element)
-                        .transform(
-                                "Asserting operator",
-                                BasicTypeInfo.LONG_TYPE_INFO,
-                                new AssertingOperator());
-
-        counts.addSink(new DiscardingSink<>());
-        context.execute();
-    }
-
-    @Benchmark
-    @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
-    public void sortedTwoInput(SortingInputContext context) throws Exception {
-        StreamExecutionEnvironment env = context.env;
-
-        DataStreamSource<Integer> elements1 =
-                env.fromParallelCollection(
-                        new InputGenerator(RECORDS_PER_INVOCATION / 2),
-                        BasicTypeInfo.INT_TYPE_INFO);
-
-        DataStreamSource<Integer> elements2 =
-                env.fromParallelCollection(
-                        new InputGenerator(RECORDS_PER_INVOCATION / 2),
-                        BasicTypeInfo.INT_TYPE_INFO);
-        SingleOutputStreamOperator<Long> counts =
-                elements1
-                        .connect(elements2)
-                        .keyBy(element -> element, element -> element)
-                        .transform(
-                                "Asserting operator",
-                                BasicTypeInfo.LONG_TYPE_INFO,
-                                new AssertingTwoInputOperator());
-
-        counts.addSink(new DiscardingSink<>());
-        context.execute();
-    }
-
-    @Benchmark
-    @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
-    public void sortedMultiInput(SortingInputContext context) throws Exception 
{
-        StreamExecutionEnvironment env = context.env;
-
-        KeyedStream<Integer, Object> elements1 =
-                env.fromParallelCollection(
-                                new InputGenerator(RECORDS_PER_INVOCATION / 3),
-                                BasicTypeInfo.INT_TYPE_INFO)
-                        .keyBy(el -> el);
-
-        KeyedStream<Integer, Object> elements2 =
-                env.fromParallelCollection(
-                                new InputGenerator(RECORDS_PER_INVOCATION / 3),
-                                BasicTypeInfo.INT_TYPE_INFO)
-                        .keyBy(el -> el);
-
-        KeyedStream<Integer, Object> elements3 =
-                env.fromParallelCollection(
-                                new InputGenerator(RECORDS_PER_INVOCATION / 3),
-                                BasicTypeInfo.INT_TYPE_INFO)
-                        .keyBy(el -> el);
-
-        KeyedMultipleInputTransformation<Long> assertingTransformation =
-                new KeyedMultipleInputTransformation<>(
-                        "Asserting operator",
-                        new AssertingThreeInputOperatorFactory(),
-                        BasicTypeInfo.LONG_TYPE_INFO,
-                        -1,
-                        BasicTypeInfo.INT_TYPE_INFO);
-        assertingTransformation.addInput(elements1.getTransformation(), 
elements1.getKeySelector());
-        assertingTransformation.addInput(elements2.getTransformation(), 
elements2.getKeySelector());
-        assertingTransformation.addInput(elements3.getTransformation(), 
elements3.getKeySelector());
-
-        env.addOperator(assertingTransformation);
-        DataStream<Long> counts = new DataStream<>(env, 
assertingTransformation);
-
-        counts.addSink(new DiscardingSink<>());
-        context.execute();
-    }
-
-    private static final class ProcessedKeysOrderAsserter implements 
Serializable {
-        private final Set<Integer> seenKeys = new HashSet<>();
-        private long seenRecords = 0;
-        private Integer currentKey = null;
-
-        public void processElement(Integer element) {
-            this.seenRecords++;
-            if (!Objects.equals(element, currentKey)) {
-                if (!seenKeys.add(element)) {
-                    Assert.fail("Received an out of order key: " + element);
-                }
-                currentKey = element;
-            }
-        }
-
-        public long getSeenRecords() {
-            return seenRecords;
-        }
-    }
-
-    private static class AssertingOperator extends AbstractStreamOperator<Long>
-            implements OneInputStreamOperator<Integer, Long>, BoundedOneInput {
-        private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
-
-        @Override
-        public void processElement(StreamRecord<Integer> element) {
-            asserter.processElement(element.getValue());
-        }
-
-        @Override
-        public void endInput() {
-            output.collect(new StreamRecord<>(asserter.getSeenRecords()));
-        }
-    }
-
-    private static class AssertingTwoInputOperator extends 
AbstractStreamOperator<Long>
-            implements TwoInputStreamOperator<Integer, Integer, Long>, 
BoundedMultiInput {
-        private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
-        private boolean input1Finished = false;
-        private boolean input2Finished = false;
-
-        @Override
-        public void processElement1(StreamRecord<Integer> element) {
-            asserter.processElement(element.getValue());
-        }
-
-        @Override
-        public void processElement2(StreamRecord<Integer> element) {
-            asserter.processElement(element.getValue());
-        }
-
-        @Override
-        public void endInput(int inputId) {
-            if (inputId == 1) {
-                input1Finished = true;
-            }
-
-            if (inputId == 2) {
-                input2Finished = true;
-            }
-
-            if (input1Finished && input2Finished) {
-                output.collect(new StreamRecord<>(asserter.getSeenRecords()));
-            }
-        }
-    }
-
-    private static class AssertingThreeInputOperator extends 
AbstractStreamOperatorV2<Long>
-            implements MultipleInputStreamOperator<Long>, BoundedMultiInput {
-        private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
-        private boolean input1Finished = false;
-        private boolean input2Finished = false;
-        private boolean input3Finished = false;
-
-        public AssertingThreeInputOperator(
-                StreamOperatorParameters<Long> parameters, int numberOfInputs) 
{
-            super(parameters, 3);
-            assert numberOfInputs == 3;
-        }
-
-        @Override
-        public void endInput(int inputId) {
-            if (inputId == 1) {
-                input1Finished = true;
-            }
-
-            if (inputId == 2) {
-                input2Finished = true;
-            }
-
-            if (inputId == 3) {
-                input3Finished = true;
-            }
-
-            if (input1Finished && input2Finished && input3Finished) {
-                output.collect(new StreamRecord<>(asserter.getSeenRecords()));
-            }
-        }
-
-        @Override
-        public List<Input> getInputs() {
-            return Arrays.asList(
-                    new SingleInput(asserter::processElement),
-                    new SingleInput(asserter::processElement),
-                    new SingleInput(asserter::processElement));
-        }
-    }
-
-    private static class AssertingThreeInputOperatorFactory implements 
StreamOperatorFactory<Long> {
-        @Override
-        @SuppressWarnings("unchecked")
-        public <T extends StreamOperator<Long>> T createStreamOperator(
-                StreamOperatorParameters<Long> parameters) {
-            return (T) new AssertingThreeInputOperator(parameters, 3);
-        }
-
-        @Override
-        public void setChainingStrategy(ChainingStrategy strategy) {}
-
-        @Override
-        public ChainingStrategy getChainingStrategy() {
-            return ChainingStrategy.NEVER;
-        }
-
-        @Override
-        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
-            return AssertingThreeInputOperator.class;
-        }
-    }
-
-    private static class SingleInput implements Input<Integer> {
-
-        private final Consumer<Integer> recordConsumer;
-
-        private SingleInput(Consumer<Integer> recordConsumer) {
-            this.recordConsumer = recordConsumer;
-        }
-
-        @Override
-        public void processElement(StreamRecord<Integer> element) {
-            recordConsumer.accept(element.getValue());
-        }
-
-        @Override
-        public void 
processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {}
-
-        @Override
-        public void processLatencyMarker(LatencyMarker latencyMarker) {}
-
-        @Override
-        public void setKeyContextElement(StreamRecord<Integer> record) {}
-
-        @Override
-        public void processWatermarkStatus(WatermarkStatus watermarkStatus) 
throws Exception {}
-    }
-
-    private static class InputGenerator extends SplittableIterator<Integer> {
-
-        private final long numberOfRecords;
-        private long generatedRecords;
-
-        private InputGenerator(long numberOfRecords) {
-            this.numberOfRecords = numberOfRecords;
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public Iterator<Integer>[] split(int numPartitions) {
-            long numberOfRecordsPerPartition = numberOfRecords / numPartitions;
-            long remainder = numberOfRecords % numPartitions;
-            Iterator<Integer>[] iterators = new Iterator[numPartitions];
-
-            for (int i = 0; i < numPartitions - 1; i++) {
-                iterators[i] = new InputGenerator(numberOfRecordsPerPartition);
-            }
-
-            iterators[numPartitions - 1] =
-                    new InputGenerator(numberOfRecordsPerPartition + 
remainder);
-
-            return iterators;
-        }
-
-        @Override
-        public int getMaximumNumberOfSplits() {
-            return (int) Math.min(numberOfRecords, Integer.MAX_VALUE);
-        }
-
-        @Override
-        public boolean hasNext() {
-            return generatedRecords < numberOfRecords;
-        }
-
-        @Override
-        public Integer next() {
-            if (hasNext()) {
-                generatedRecords++;
-                return INDICES.get((int) (generatedRecords % INDICES.size()));
-            }
-
-            return null;
-        }
-    }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.benchmark;
+//
+//import org.apache.flink.api.common.RuntimeExecutionMode;
+//import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+//import org.apache.flink.configuration.AlgorithmOptions;
+//import org.apache.flink.configuration.Configuration;
+//import org.apache.flink.configuration.ExecutionOptions;
+//import org.apache.flink.streaming.api.datastream.DataStream;
+//import org.apache.flink.streaming.api.datastream.DataStreamSource;
+//import org.apache.flink.streaming.api.datastream.KeyedStream;
+//import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+//import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+//import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+//import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+//import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+//import org.apache.flink.streaming.api.operators.BoundedOneInput;
+//import org.apache.flink.streaming.api.operators.ChainingStrategy;
+//import org.apache.flink.streaming.api.operators.Input;
+//import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+//import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+//import org.apache.flink.streaming.api.operators.StreamOperator;
+//import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+//import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+//import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+//import 
org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
+//import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+//import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+//import org.apache.flink.util.SplittableIterator;
+//
+//import org.junit.Assert;
+//import org.openjdk.jmh.annotations.Benchmark;
+//import org.openjdk.jmh.annotations.OperationsPerInvocation;
+//import org.openjdk.jmh.annotations.State;
+//import org.openjdk.jmh.runner.Runner;
+//import org.openjdk.jmh.runner.RunnerException;
+//import org.openjdk.jmh.runner.options.Options;
+//import org.openjdk.jmh.runner.options.OptionsBuilder;
+//import org.openjdk.jmh.runner.options.VerboseMode;
+//
+//import java.io.Serializable;
+//import java.util.Arrays;
+//import java.util.Collections;
+//import java.util.HashSet;
+//import java.util.Iterator;
+//import java.util.List;
+//import java.util.Objects;
+//import java.util.Set;
+//import java.util.function.Consumer;
+//import java.util.stream.Collectors;
+//import java.util.stream.IntStream;
+//
+//import static org.openjdk.jmh.annotations.Scope.Thread;
+//
+///**
+// * An end to end test for sorted inputs for a keyed operator with bounded 
inputs.
+// */
+//public class SortingBoundedInputBenchmarks extends BenchmarkBase {
+//
+//     private static final int RECORDS_PER_INVOCATION = 1_500_000;
+//     private static final List<Integer> INDICES =
+//             IntStream.range(0, 10).boxed().collect(Collectors.toList());
+//
+//     static {
+//             Collections.shuffle(INDICES);
+//     }
+//
+//     public static void main(String[] args) throws RunnerException {
+//             Options options =
+//                     new OptionsBuilder()
+//                             .verbosity(VerboseMode.NORMAL)
+//                             .include(
+//                                     ".*"
+//                                             + 
SortingBoundedInputBenchmarks.class.getCanonicalName()
+//                                             + ".*")
+//                             .build();
+//
+//             new Runner(options).run();
+//     }
+//
+//     @State(Thread)
+//     public static class SortingInputContext extends FlinkEnvironmentContext 
{
+//             @Override
+//             protected Configuration createConfiguration() {
+//                     Configuration configuration = 
super.createConfiguration();
+//                     configuration.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
+//                     
configuration.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, 0f);
+//                     return configuration;
+//             }
+//     }
+//
+//     @Benchmark
+//     @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+//     public void sortedOneInput(SortingInputContext context) throws 
Exception {
+//             StreamExecutionEnvironment env = context.env;
+//
+//             DataStreamSource<Integer> elements =
+//                     env.fromParallelCollection(
+//                             new InputGenerator(RECORDS_PER_INVOCATION), 
BasicTypeInfo.INT_TYPE_INFO);
+//
+//             SingleOutputStreamOperator<Long> counts =
+//                     elements.keyBy(element -> element)
+//                             .transform(
+//                                     "Asserting operator",
+//                                     BasicTypeInfo.LONG_TYPE_INFO,
+//                                     new AssertingOperator());
+//
+//             counts.addSink(new DiscardingSink<>());
+//             context.execute();
+//     }
+//
+//     @Benchmark
+//     @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+//     public void sortedTwoInput(SortingInputContext context) throws 
Exception {
+//             StreamExecutionEnvironment env = context.env;
+//
+//             DataStreamSource<Integer> elements1 =
+//                     env.fromParallelCollection(
+//                             new InputGenerator(RECORDS_PER_INVOCATION / 2),
+//                             BasicTypeInfo.INT_TYPE_INFO);
+//
+//             DataStreamSource<Integer> elements2 =
+//                     env.fromParallelCollection(
+//                             new InputGenerator(RECORDS_PER_INVOCATION / 2),
+//                             BasicTypeInfo.INT_TYPE_INFO);
+//             SingleOutputStreamOperator<Long> counts =
+//                     elements1
+//                             .connect(elements2)
+//                             .keyBy(element -> element, element -> element)
+//                             .transform(
+//                                     "Asserting operator",
+//                                     BasicTypeInfo.LONG_TYPE_INFO,
+//                                     new AssertingTwoInputOperator());
+//
+//             counts.addSink(new DiscardingSink<>());
+//             context.execute();
+//     }
+//
+//     @Benchmark
+//     @OperationsPerInvocation(value = RECORDS_PER_INVOCATION)
+//     public void sortedMultiInput(SortingInputContext context) throws 
Exception {
+//             StreamExecutionEnvironment env = context.env;
+//
+//             KeyedStream<Integer, Object> elements1 =
+//                     env.fromParallelCollection(
+//                             new InputGenerator(RECORDS_PER_INVOCATION / 3),
+//                             BasicTypeInfo.INT_TYPE_INFO)
+//                             .keyBy(el -> el);
+//
+//             KeyedStream<Integer, Object> elements2 =
+//                     env.fromParallelCollection(
+//                             new InputGenerator(RECORDS_PER_INVOCATION / 3),
+//                             BasicTypeInfo.INT_TYPE_INFO)
+//                             .keyBy(el -> el);
+//
+//             KeyedStream<Integer, Object> elements3 =
+//                     env.fromParallelCollection(
+//                             new InputGenerator(RECORDS_PER_INVOCATION / 3),
+//                             BasicTypeInfo.INT_TYPE_INFO)
+//                             .keyBy(el -> el);
+//
+//             KeyedMultipleInputTransformation<Long> assertingTransformation =
+//                     new KeyedMultipleInputTransformation<>(
+//                             "Asserting operator",
+//                             new AssertingThreeInputOperatorFactory(),
+//                             BasicTypeInfo.LONG_TYPE_INFO,
+//                             -1,
+//                             BasicTypeInfo.INT_TYPE_INFO);
+//             assertingTransformation.addInput(elements1.getTransformation(), 
elements1.getKeySelector());
+//             assertingTransformation.addInput(elements2.getTransformation(), 
elements2.getKeySelector());
+//             assertingTransformation.addInput(elements3.getTransformation(), 
elements3.getKeySelector());
+//
+//             env.addOperator(assertingTransformation);
+//             DataStream<Long> counts = new DataStream<>(env, 
assertingTransformation);
+//
+//             counts.addSink(new DiscardingSink<>());
+//             context.execute();
+//     }
+//
+//     private static final class ProcessedKeysOrderAsserter implements 
Serializable {
+//             private final Set<Integer> seenKeys = new HashSet<>();
+//             private long seenRecords = 0;
+//             private Integer currentKey = null;
+//
+//             public void processElement(Integer element) {
+//                     this.seenRecords++;
+//                     if (!Objects.equals(element, currentKey)) {
+//                             if (!seenKeys.add(element)) {
+//                                     Assert.fail("Received an out of order 
key: " + element);
+//                             }
+//                             currentKey = element;
+//                     }
+//             }
+//
+//             public long getSeenRecords() {
+//                     return seenRecords;
+//             }
+//     }
+//
+//     private static class AssertingOperator extends 
AbstractStreamOperator<Long>
+//             implements OneInputStreamOperator<Integer, Long>, 
BoundedOneInput {
+//             private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
+//
+//             @Override
+//             public void processElement(StreamRecord<Integer> element) {
+//                     asserter.processElement(element.getValue());
+//             }
+//
+//             @Override
+//             public void 
processStreamStatus(org.apache.flink.streaming.runtime.streamstatus.StreamStatus
 streamStatus) throws Exception {
+//
+//             }
+//
+//             @Override
+//             public void endInput() {
+//                     output.collect(new 
StreamRecord<>(asserter.getSeenRecords()));
+//             }
+//     }
+//
+//     private static class AssertingTwoInputOperator extends 
AbstractStreamOperator<Long>
+//             implements TwoInputStreamOperator<Integer, Integer, Long>, 
BoundedMultiInput {
+//             private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
+//             private boolean input1Finished = false;
+//             private boolean input2Finished = false;
+//
+//             @Override
+//             public void processElement1(StreamRecord<Integer> element) {
+//                     asserter.processElement(element.getValue());
+//             }
+//
+//             @Override
+//             public void processElement2(StreamRecord<Integer> element) {
+//                     asserter.processElement(element.getValue());
+//             }
+//
+//             @Override
+//             public void 
processStreamStatus1(org.apache.flink.streaming.runtime.streamstatus.StreamStatus
 streamStatus) throws Exception {
+//
+//             }
+//
+//             @Override
+//             public void 
processStreamStatus2(org.apache.flink.streaming.runtime.streamstatus.StreamStatus
 streamStatus) throws Exception {
+//
+//             }
+//
+//             @Override
+//             public void endInput(int inputId) {
+//                     if (inputId == 1) {
+//                             input1Finished = true;
+//                     }
+//
+//                     if (inputId == 2) {
+//                             input2Finished = true;
+//                     }
+//
+//                     if (input1Finished && input2Finished) {
+//                             output.collect(new 
StreamRecord<>(asserter.getSeenRecords()));
+//                     }
+//             }
+//     }
+//
+//     private static class AssertingThreeInputOperator extends 
AbstractStreamOperatorV2<Long>
+//             implements MultipleInputStreamOperator<Long>, BoundedMultiInput 
{
+//             private final ProcessedKeysOrderAsserter asserter = new 
ProcessedKeysOrderAsserter();
+//             private boolean input1Finished = false;
+//             private boolean input2Finished = false;
+//             private boolean input3Finished = false;
+//
+//             public AssertingThreeInputOperator(
+//                     StreamOperatorParameters<Long> parameters, int 
numberOfInputs) {
+//                     super(parameters, 3);
+//                     assert numberOfInputs == 3;
+//             }
+//
+//             @Override
+//             public void endInput(int inputId) {
+//                     if (inputId == 1) {
+//                             input1Finished = true;
+//                     }
+//
+//                     if (inputId == 2) {
+//                             input2Finished = true;
+//                     }
+//
+//                     if (inputId == 3) {
+//                             input3Finished = true;
+//                     }
+//
+//                     if (input1Finished && input2Finished && input3Finished) 
{
+//                             output.collect(new 
StreamRecord<>(asserter.getSeenRecords()));
+//                     }
+//             }
+//
+//             @Override
+//             public List<Input> getInputs() {
+//                     return Arrays.asList(
+//                             new SingleInput(asserter::processElement),
+//                             new SingleInput(asserter::processElement),
+//                             new SingleInput(asserter::processElement));
+//             }
+//     }
+//
+//     private static class AssertingThreeInputOperatorFactory implements 
StreamOperatorFactory<Long> {
+//             @Override
+//             @SuppressWarnings("unchecked")
+//             public <T extends StreamOperator<Long>> T createStreamOperator(
+//                     StreamOperatorParameters<Long> parameters) {
+//                     return (T) new AssertingThreeInputOperator(parameters, 
3);
+//             }
+//
+//             @Override
+//             public void setChainingStrategy(ChainingStrategy strategy) {
+//             }
+//
+//             @Override
+//             public ChainingStrategy getChainingStrategy() {
+//                     return ChainingStrategy.NEVER;
+//             }
+//
+//             @Override
+//             public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+//                     return AssertingThreeInputOperator.class;
+//             }
+//     }
+//
+//     private static class SingleInput implements Input<Integer> {
+//
+//             private final Consumer<Integer> recordConsumer;
+//
+//             private SingleInput(Consumer<Integer> recordConsumer) {
+//                     this.recordConsumer = recordConsumer;
+//             }
+//
+//             @Override
+//             public void processElement(StreamRecord<Integer> element) {
+//                     recordConsumer.accept(element.getValue());
+//             }
+//
+//             @Override
+//             public void 
processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {
+//             }
+//
+//             @Override
+//             public void 
processStreamStatus(org.apache.flink.streaming.runtime.streamstatus.StreamStatus
 streamStatus) throws Exception {
+//
+//             }
+//
+//             @Override
+//             public void processLatencyMarker(LatencyMarker latencyMarker) {
+//             }
+//
+//             @Override
+//             public void setKeyContextElement(StreamRecord<Integer> record) {
+//             }
+//     }
+//
+//     private static class InputGenerator extends SplittableIterator<Integer> 
{
+//
+//             private final long numberOfRecords;
+//             private long generatedRecords;
+//
+//             private InputGenerator(long numberOfRecords) {
+//                     this.numberOfRecords = numberOfRecords;
+//             }
+//
+//             @Override
+//             @SuppressWarnings("unchecked")
+//             public Iterator<Integer>[] split(int numPartitions) {
+//                     long numberOfRecordsPerPartition = numberOfRecords / 
numPartitions;
+//                     long remainder = numberOfRecords % numPartitions;
+//                     Iterator<Integer>[] iterators = new 
Iterator[numPartitions];
+//
+//                     for (int i = 0; i < numPartitions - 1; i++) {
+//                             iterators[i] = new 
InputGenerator(numberOfRecordsPerPartition);
+//                     }
+//
+//                     iterators[numPartitions - 1] =
+//                             new InputGenerator(numberOfRecordsPerPartition 
+ remainder);
+//
+//                     return iterators;
+//             }
+//
+//             @Override
+//             public int getMaximumNumberOfSplits() {
+//                     return (int) Math.min(numberOfRecords, 
Integer.MAX_VALUE);
+//             }
+//
+//             @Override
+//             public boolean hasNext() {
+//                     return generatedRecords < numberOfRecords;
+//             }
+//
+//             @Override
+//             public Integer next() {
+//                     if (hasNext()) {
+//                             generatedRecords++;
+//                             return INDICES.get((int) (generatedRecords % 
INDICES.size()));
+//                     }
+//
+//                     return null;
+//             }
+//     }
+//}

Reply via email to