dawidwys commented on a change in pull request #31:
URL: https://github.com/apache/flink-benchmarks/pull/31#discussion_r710998927
##########
File path: src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
##########
@@ -82,10 +98,86 @@ public void
multiInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exc
env.execute();
}
+ @Benchmark
+ @OperationsPerInvocation(CHAINED_IDLE_RECORDS_PER_INVOCATION)
+ public void multiInputChainedIdleSource(FlinkEnvironmentContext
context) throws Exception {
+ final StreamExecutionEnvironment env = context.env;
+ env.getConfig().enableObjectReuse();
+
+ final DataStream<Long> source1 =
+ env.fromSource(
+ new NumberSequenceSource(1L,
CHAINED_IDLE_RECORDS_PER_INVOCATION),
+
WatermarkStrategy.noWatermarks(),
+ "source-1");
+
+ final DataStreamSource<Integer> source2 =
+ env.fromSource(new IdlingSource(1),
WatermarkStrategy.noWatermarks(), "source-2");
+
+ MultipleInputTransformation<Long> transform = new
MultipleInputTransformation<>(
+ "custom operator",
+ new MultiplyByTwoOperatorFactory(),
+ BasicTypeInfo.LONG_TYPE_INFO,
+ 1);
+
+ transform.addInput(((DataStream<?>)
source1).getTransformation());
+ transform.addInput(((DataStream<?>)
source2).getTransformation());
+
transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
+
+ env.addOperator(transform);
+ final SingleOutputStreamOperator<Long> stream = new
MultipleConnectedStreams(env).transform(transform);
+
+ IdlingSource.reset();
+ try (final CloseableIterator<Long> iterator =
stream.executeAndCollect()) {
+ while (iterator.hasNext()) {
+ final Long next = iterator.next();
+ if (next ==
CHAINED_IDLE_RECORDS_PER_INVOCATION) {
+ IdlingSource.signalCanFinish();
+ }
+ }
+ }
+ }
Review comment:
At least in my setup I still can not run it with more than 5 digit
number of records without the fix.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]