dawidwys commented on a change in pull request #31:
URL: https://github.com/apache/flink-benchmarks/pull/31#discussion_r710998927



##########
File path: src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
##########
@@ -82,10 +98,86 @@ public void 
multiInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exc
                env.execute();
        }
 
+       @Benchmark
+       @OperationsPerInvocation(CHAINED_IDLE_RECORDS_PER_INVOCATION)
+       public void multiInputChainedIdleSource(FlinkEnvironmentContext 
context) throws Exception {
+               final StreamExecutionEnvironment env = context.env;
+               env.getConfig().enableObjectReuse();
+
+               final DataStream<Long> source1 =
+                               env.fromSource(
+                                               new NumberSequenceSource(1L, 
CHAINED_IDLE_RECORDS_PER_INVOCATION),
+                                               
WatermarkStrategy.noWatermarks(),
+                                               "source-1");
+
+               final DataStreamSource<Integer> source2 =
+                               env.fromSource(new IdlingSource(1), 
WatermarkStrategy.noWatermarks(), "source-2");
+
+               MultipleInputTransformation<Long> transform = new 
MultipleInputTransformation<>(
+                               "custom operator",
+                               new MultiplyByTwoOperatorFactory(),
+                               BasicTypeInfo.LONG_TYPE_INFO,
+                               1);
+
+               transform.addInput(((DataStream<?>) 
source1).getTransformation());
+               transform.addInput(((DataStream<?>) 
source2).getTransformation());
+               
transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
+
+               env.addOperator(transform);
+               final SingleOutputStreamOperator<Long> stream = new 
MultipleConnectedStreams(env).transform(transform);
+
+               IdlingSource.reset();
+               try (final CloseableIterator<Long> iterator = 
stream.executeAndCollect()) {
+                       while (iterator.hasNext()) {
+                               final Long next = iterator.next();
+                               if (next == 
CHAINED_IDLE_RECORDS_PER_INVOCATION) {
+                                       IdlingSource.signalCanFinish();
+                               }
+                       }
+               }
+       }

Review comment:
       At least in my setup I still can not run it with more than 5 digit 
number of records without the fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to