pnowojski commented on a change in pull request #31:
URL: https://github.com/apache/flink-benchmarks/pull/31#discussion_r710976257
##########
File path: src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
##########
@@ -82,10 +98,86 @@ public void
multiInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exc
env.execute();
}
+ @Benchmark
+ @OperationsPerInvocation(CHAINED_IDLE_RECORDS_PER_INVOCATION)
+ public void multiInputChainedIdleSource(FlinkEnvironmentContext
context) throws Exception {
+ final StreamExecutionEnvironment env = context.env;
+ env.getConfig().enableObjectReuse();
+
+ final DataStream<Long> source1 =
+ env.fromSource(
+ new NumberSequenceSource(1L,
CHAINED_IDLE_RECORDS_PER_INVOCATION),
+
WatermarkStrategy.noWatermarks(),
+ "source-1");
+
+ final DataStreamSource<Integer> source2 =
+ env.fromSource(new IdlingSource(1),
WatermarkStrategy.noWatermarks(), "source-2");
+
+ MultipleInputTransformation<Long> transform = new
MultipleInputTransformation<>(
+ "custom operator",
+ new MultiplyByTwoOperatorFactory(),
+ BasicTypeInfo.LONG_TYPE_INFO,
+ 1);
+
+ transform.addInput(((DataStream<?>)
source1).getTransformation());
+ transform.addInput(((DataStream<?>)
source2).getTransformation());
+
transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
+
+ env.addOperator(transform);
+ final SingleOutputStreamOperator<Long> stream = new
MultipleConnectedStreams(env).transform(transform);
+
+ IdlingSource.reset();
+ try (final CloseableIterator<Long> iterator =
stream.executeAndCollect()) {
+ while (iterator.hasNext()) {
+ final Long next = iterator.next();
+ if (next ==
CHAINED_IDLE_RECORDS_PER_INVOCATION) {
+ IdlingSource.signalCanFinish();
+ }
+ }
+ }
+ }
Review comment:
```suggestion
env.addOperator(transform);
IdlingSource.reset();
new MultipleConnectedStreams(env)
.transform(transform)
.addSink(new SinkClosingIdlingSource())
.setParallelism(1);
context.execute();
}
private static class SinkClosingIdlingSource implements
SinkFunction<Long> {
private int recordsSoFar = 0;
@Override
public void invoke(Long value) throws Exception {
if (++recordsSoFar >= CHAINED_IDLE_RECORDS_PER_INVOCATION) {
IdlingSource.signalCanFinish();
}
}
}
```
##########
File path: src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
##########
@@ -82,10 +98,86 @@ public void
multiInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exc
env.execute();
}
+ @Benchmark
+ @OperationsPerInvocation(CHAINED_IDLE_RECORDS_PER_INVOCATION)
+ public void multiInputChainedIdleSource(FlinkEnvironmentContext
context) throws Exception {
+ final StreamExecutionEnvironment env = context.env;
+ env.getConfig().enableObjectReuse();
+
+ final DataStream<Long> source1 =
+ env.fromSource(
+ new NumberSequenceSource(1L,
CHAINED_IDLE_RECORDS_PER_INVOCATION),
+
WatermarkStrategy.noWatermarks(),
+ "source-1");
+
+ final DataStreamSource<Integer> source2 =
+ env.fromSource(new IdlingSource(1),
WatermarkStrategy.noWatermarks(), "source-2");
+
+ MultipleInputTransformation<Long> transform = new
MultipleInputTransformation<>(
+ "custom operator",
+ new MultiplyByTwoOperatorFactory(),
+ BasicTypeInfo.LONG_TYPE_INFO,
+ 1);
+
+ transform.addInput(((DataStream<?>)
source1).getTransformation());
+ transform.addInput(((DataStream<?>)
source2).getTransformation());
+
transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
+
+ env.addOperator(transform);
+ final SingleOutputStreamOperator<Long> stream = new
MultipleConnectedStreams(env).transform(transform);
+
+ IdlingSource.reset();
+ try (final CloseableIterator<Long> iterator =
stream.executeAndCollect()) {
+ while (iterator.hasNext()) {
+ final Long next = iterator.next();
+ if (next ==
CHAINED_IDLE_RECORDS_PER_INVOCATION) {
+ IdlingSource.signalCanFinish();
+ }
+ }
+ }
+ }
Review comment:
with this change this benchmark starts working normally and you can
safely increase number of records to 15M or 25M.
And only with this change you can see the true scope of the performance
regression.
without fix ~7_000 records/s
wit fix ~14_000_000records/s
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]