This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
The following commit(s) were added to refs/heads/master by this push:
new 517f1e7 [FLINK-24300] Added benchmarks for idling chained sources
517f1e7 is described below
commit 517f1e7301b6a7086a3d8b70067da88ba6294022
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Sep 16 20:16:20 2021 +0200
[FLINK-24300] Added benchmarks for idling chained sources
This closes #31
---
pom.xml | 6 ++
.../flink/benchmark/MultipleInputBenchmark.java | 106 +++++++++++++++++++--
2 files changed, 105 insertions(+), 7 deletions(-)
diff --git a/pom.xml b/pom.xml
index f0f0bda..f92f166 100644
--- a/pom.xml
+++ b/pom.xml
@@ -201,6 +201,12 @@ under the License.
<type>jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ </dependency>
<!-- serialization benchmark requirements -->
diff --git
a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
index f070c59..53c6115 100644
--- a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
@@ -18,14 +18,27 @@
package org.apache.flink.benchmark;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceReader;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.benchmark.functions.LongSource;
import org.apache.flink.benchmark.functions.QueuingLongSource;
import org.apache.flink.benchmark.operators.MultiplyByTwoOperatorFactory;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.openjdk.jmh.annotations.Benchmark;
@@ -36,18 +49,19 @@ import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;
-public class MultipleInputBenchmark extends BenchmarkBase {
+import java.util.concurrent.CompletableFuture;
+public class MultipleInputBenchmark extends BenchmarkBase {
public static final int RECORDS_PER_INVOCATION =
TwoInputBenchmark.RECORDS_PER_INVOCATION;
public static final int ONE_IDLE_RECORDS_PER_INVOCATION =
TwoInputBenchmark.ONE_IDLE_RECORDS_PER_INVOCATION;
public static final long CHECKPOINT_INTERVAL_MS =
TwoInputBenchmark.CHECKPOINT_INTERVAL_MS;
public static void main(String[] args)
- throws RunnerException {
+ throws RunnerException {
Options options = new OptionsBuilder()
- .verbosity(VerboseMode.NORMAL)
- .include(".*" +
MultipleInputBenchmark.class.getSimpleName() + ".*")
- .build();
+ .verbosity(VerboseMode.NORMAL)
+ .include(".*" +
MultipleInputBenchmark.class.getSimpleName() + ".*")
+ .build();
new Runner(options).run();
}
@@ -82,10 +96,88 @@ public class MultipleInputBenchmark extends BenchmarkBase {
env.execute();
}
+ @Benchmark
+ @OperationsPerInvocation(RECORDS_PER_INVOCATION)
+ public void multiInputChainedIdleSource(FlinkEnvironmentContext
context) throws Exception {
+ final StreamExecutionEnvironment env = context.env;
+ env.getConfig().enableObjectReuse();
+
+ final DataStream<Long> source1 =
+ env.fromSource(
+ new NumberSequenceSource(1L,
RECORDS_PER_INVOCATION),
+
WatermarkStrategy.noWatermarks(),
+ "source-1");
+
+ final DataStreamSource<Integer> source2 =
+ env.fromSource(new IdlingSource(1),
WatermarkStrategy.noWatermarks(), "source-2");
+
+ MultipleInputTransformation<Long> transform = new
MultipleInputTransformation<>(
+ "custom operator",
+ new MultiplyByTwoOperatorFactory(),
+ BasicTypeInfo.LONG_TYPE_INFO,
+ 1);
+
+ transform.addInput(((DataStream<?>)
source1).getTransformation());
+ transform.addInput(((DataStream<?>)
source2).getTransformation());
+
transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
+
+ env.addOperator(transform);
+ new
MultipleConnectedStreams(env).transform(transform).addSink(new
SinkClosingIdlingSource()).setParallelism(1);
+ context.execute();
+ }
+
+ private static class IdlingSource extends MockSource {
+ private static CompletableFuture<Void> canFinish = new
CompletableFuture<>();
+
+ public static void signalCanFinish() {
+ canFinish.complete(null);
+ }
+
+ public static void reset() {
+ canFinish.completeExceptionally(new
IllegalStateException("State has been reset"));
+ canFinish = new CompletableFuture<>();
+ }
+
+ public IdlingSource(int numSplits) {
+ super(Boundedness.BOUNDED, numSplits, true, true);
+ }
+
+ @Override
+ public SourceReader<Integer, MockSourceSplit> createReader(
+ SourceReaderContext readerContext) {
+ return new MockSourceReader(true, true) {
+ @Override
+ public InputStatus
pollNext(ReaderOutput<Integer> sourceOutput) {
+ if (canFinish.isDone() &&
!canFinish.isCompletedExceptionally()) {
+ return InputStatus.END_OF_INPUT;
+ } else {
+ return
InputStatus.NOTHING_AVAILABLE;
+ }
+ }
+
+ @Override
+ public synchronized CompletableFuture<Void>
isAvailable() {
+ return canFinish;
+ }
+ };
+ }
+ }
+
+ private static class SinkClosingIdlingSource implements
SinkFunction<Long> {
+ private int recordsSoFar = 0;
+
+ @Override
+ public void invoke(Long value) {
+ if (++recordsSoFar >= RECORDS_PER_INVOCATION) {
+ IdlingSource.signalCanFinish();
+ }
+ }
+ }
+
private static void connectAndDiscard(
StreamExecutionEnvironment env,
- DataStreamSource<Long> source1,
- DataStreamSource<Long> source2) {
+ DataStream<?> source1,
+ DataStream<?> source2) {
MultipleInputTransformation<Long> transform = new
MultipleInputTransformation<>(
"custom operator",
new MultiplyByTwoOperatorFactory(),