[
https://issues.apache.org/jira/browse/FLINK-30533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677566#comment-17677566
]
Dong Lin commented on FLINK-30533:
----------------------------------
Hi [~pnowojski], thanks for the comments!
Regarding the 1st question, could you tell which components should also be
updated? Maybe we can change other components incrementally in addition to this
JIRA.
I might have missed the context of making sure that "sources are behaving and
handled the same way as network inputs". Could you provide links of related
tickets or discussions?
Regarding the 2nd question, I am not sure if we should add the
DataOutput#canEmitBatchOfRecords thought. Prior to this PR, DataOutput is
responsible for data flow and it only provides API to emit
data/watermark/latency. Classes such as SourceOperator and StreamTask are
responsible for control flow (e.g. mailbox, availability, loop). Having
StreamTask passes mailboxHasMail to SourceOperator preserves this separation of
data flow and control flow.
It seems cleaner and simpler to still have DataOutput be responsible only for
data flow. Could you help explain what is the benefit of using
DataOutput#canEmitBatchOfRecords?
> SourceOperator#emitNext() should push records to DataOutput in a while loop
> ---------------------------------------------------------------------------
>
> Key: FLINK-30533
> URL: https://issues.apache.org/jira/browse/FLINK-30533
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: Dong Lin
> Assignee: Dong Lin
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0
>
>
> Currently, each invocation of SourceOperator#emitNext() push at most one
> record to the given DataOutput. This unnecessarily increases the average Java
> call stack depth needed to produce a record.
> Take the following program as an example. For each element produced by this
> program, Flink runtime needs to include in the call stack these 3 function
> calls:
> * StreamTask#processInput()
> * StreamOneInputProcessor#processInput()
> * StreamTaskSourceInput#emitNext()
> {code:java}
> env.fromSequence(1, 1000000000L).map(x -> x).addSink(new DiscardingSink<>());
> {code}
>
> This ticket proposes to update SourceOperator#emitNext() to push records to
> DataOutput in a while loop. It improves Flink performance by removing an
> average of 3 function from the call stack needed to produce a record.
> Here are the benchmark results obtained by running the above program with
> parallelism=1 and object re-use enabled. The results are averaged across 5
> runs for each setup.
> * Prior to the proposed change, the average execution time is 46.1 sec with
> std=5.1 sec.
> * After the proposed change, the average execution time is 33.3 sec with
> std=0.9 sec.
> * The proposed change increases throughput by 38.4%.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)