pnowojski commented on a change in pull request #8442: [FLINK-12483] Support
(legacy) SourceFunction as special case in the mailbox model for stream tasks
URL: https://github.com/apache/flink/pull/8442#discussion_r284572225
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
##########
@@ -66,21 +66,20 @@ public StreamIterationHead(Environment env) {
//
------------------------------------------------------------------------
@Override
- protected boolean performDefaultAction() throws Exception {
+ protected void performDefaultAction(ActionContext context) throws
Exception {
StreamRecord<OUT> nextRecord = shouldWait ?
dataChannel.poll(iterationWaitTime,
TimeUnit.MILLISECONDS) :
dataChannel.take();
- if (nextRecord == null) {
- return false;
- }
-
- synchronized (getCheckpointLock()) {
- for (RecordWriterOutput<OUT> output : streamOutputs) {
- output.collect(nextRecord);
+ if (nextRecord != null) {
Review comment:
nit: invert if/else branches (shorter branch should go first)?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services