pnowojski commented on a change in pull request #31:
URL: https://github.com/apache/flink-benchmarks/pull/31#discussion_r710976257



##########
File path: src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
##########
@@ -82,10 +98,86 @@ public void 
multiInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exc
                env.execute();
        }
 
+       @Benchmark
+       @OperationsPerInvocation(CHAINED_IDLE_RECORDS_PER_INVOCATION)
+       public void multiInputChainedIdleSource(FlinkEnvironmentContext 
context) throws Exception {
+               final StreamExecutionEnvironment env = context.env;
+               env.getConfig().enableObjectReuse();
+
+               final DataStream<Long> source1 =
+                               env.fromSource(
+                                               new NumberSequenceSource(1L, 
CHAINED_IDLE_RECORDS_PER_INVOCATION),
+                                               
WatermarkStrategy.noWatermarks(),
+                                               "source-1");
+
+               final DataStreamSource<Integer> source2 =
+                               env.fromSource(new IdlingSource(1), 
WatermarkStrategy.noWatermarks(), "source-2");
+
+               MultipleInputTransformation<Long> transform = new 
MultipleInputTransformation<>(
+                               "custom operator",
+                               new MultiplyByTwoOperatorFactory(),
+                               BasicTypeInfo.LONG_TYPE_INFO,
+                               1);
+
+               transform.addInput(((DataStream<?>) 
source1).getTransformation());
+               transform.addInput(((DataStream<?>) 
source2).getTransformation());
+               
transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
+
+               env.addOperator(transform);
+               final SingleOutputStreamOperator<Long> stream = new 
MultipleConnectedStreams(env).transform(transform);
+
+               IdlingSource.reset();
+               try (final CloseableIterator<Long> iterator = 
stream.executeAndCollect()) {
+                       while (iterator.hasNext()) {
+                               final Long next = iterator.next();
+                               if (next == 
CHAINED_IDLE_RECORDS_PER_INVOCATION) {
+                                       IdlingSource.signalCanFinish();
+                               }
+                       }
+               }
+       }

Review comment:
       ```suggestion
           env.addOperator(transform);
   
           IdlingSource.reset();
           new MultipleConnectedStreams(env)
                   .transform(transform)
                   .addSink(new SinkClosingIdlingSource())
                   .setParallelism(1);
           context.execute();
       }
   
       private static class SinkClosingIdlingSource implements 
SinkFunction<Long> {
           private int recordsSoFar = 0;
   
           @Override
           public void invoke(Long value) throws Exception {
               if (++recordsSoFar >= CHAINED_IDLE_RECORDS_PER_INVOCATION) {
                   IdlingSource.signalCanFinish();
               }
           }
       }
   ```

##########
File path: src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
##########
@@ -82,10 +98,86 @@ public void 
multiInputOneIdleMapSink(FlinkEnvironmentContext context) throws Exc
                env.execute();
        }
 
+       @Benchmark
+       @OperationsPerInvocation(CHAINED_IDLE_RECORDS_PER_INVOCATION)
+       public void multiInputChainedIdleSource(FlinkEnvironmentContext 
context) throws Exception {
+               final StreamExecutionEnvironment env = context.env;
+               env.getConfig().enableObjectReuse();
+
+               final DataStream<Long> source1 =
+                               env.fromSource(
+                                               new NumberSequenceSource(1L, 
CHAINED_IDLE_RECORDS_PER_INVOCATION),
+                                               
WatermarkStrategy.noWatermarks(),
+                                               "source-1");
+
+               final DataStreamSource<Integer> source2 =
+                               env.fromSource(new IdlingSource(1), 
WatermarkStrategy.noWatermarks(), "source-2");
+
+               MultipleInputTransformation<Long> transform = new 
MultipleInputTransformation<>(
+                               "custom operator",
+                               new MultiplyByTwoOperatorFactory(),
+                               BasicTypeInfo.LONG_TYPE_INFO,
+                               1);
+
+               transform.addInput(((DataStream<?>) 
source1).getTransformation());
+               transform.addInput(((DataStream<?>) 
source2).getTransformation());
+               
transform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
+
+               env.addOperator(transform);
+               final SingleOutputStreamOperator<Long> stream = new 
MultipleConnectedStreams(env).transform(transform);
+
+               IdlingSource.reset();
+               try (final CloseableIterator<Long> iterator = 
stream.executeAndCollect()) {
+                       while (iterator.hasNext()) {
+                               final Long next = iterator.next();
+                               if (next == 
CHAINED_IDLE_RECORDS_PER_INVOCATION) {
+                                       IdlingSource.signalCanFinish();
+                               }
+                       }
+               }
+       }

Review comment:
       with this change this benchmark starts working normally and you can 
safely increase number of records to 15M or 25M.
   
   And only with this change you can see the true scope of the performance 
regression.
   without fix ~7_000 records/s
   wit fix ~14_000_000records/s




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to